diff --git a/CHANGELOG.md b/CHANGELOG.md index 44385080..5686bb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,21 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Changed `COMPATIBLE_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0". - Added migration function `migrate_0_23_0_to_0_24_0_op_log`. This function performs a migration to change the key and value of `Oplog`. +- Added minimal mode to improve behavior in remote configuration mode. + - If local configuration is not used when running Giganto, it will run in + minimal mode, with only the GraphQL server running. Retain task, peer server + task, ingest task, publish task, and DB migration are not executed. + - In minimal mode, the APIs provided by the GraphQL server are limited. In + this mode, only the APIs provided by `graphql::status` are available. + - If the configuration is received successfully via the `setConfig` GraphQL + API, it will switch to normal mode, where all task and DB migrations run + as normal, just like when running Giganto with local settings. + +### Removed + +- Removed OS-specific configuration directory. + - Linux: $HOME/.config/giganto/config.toml + - macOS: $HOME/Library/Application Support/com.cluml.giganto/config.toml ## [0.23.0] - 2024-11-21 diff --git a/Cargo.lock b/Cargo.lock index 85afd84c..b91f8c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -765,27 +765,6 @@ dependencies = [ "crypto-common", ] -[[package]] -name = "directories" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys 0.48.0", -] - [[package]] name = "displaydoc" version = "0.2.5" @@ -1037,7 +1016,6 @@ dependencies = [ "ctrlc", "data-encoding", "deluxe", - "directories", "futures-util", "giganto-client", "graphql_client", @@ -1777,16 +1755,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "libredox" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" -dependencies = [ - "bitflags 2.6.0", - "libc", -] - [[package]] name = "librocksdb-sys" version = "0.16.0+8.10.0" @@ -2173,12 +2141,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "overload" version = "0.1.1" @@ -2573,17 +2535,6 @@ dependencies = [ "bitflags 2.6.0", ] -[[package]] -name = "redox_users" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" -dependencies = [ - "getrandom", - "libredox", - "thiserror 1.0.69", -] - [[package]] name = "regex" version = "1.11.1" diff --git a/Cargo.toml b/Cargo.toml index 07a15c0c..dd015f3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ config = { version = "0.14", features = ["toml"], default-features = false } ctrlc = { version = "3", features = ["termination"] } data-encoding = "2" deluxe = "0.5" -directories = "5" futures-util = "0.3" giganto-client = { git = "https://github.com/aicers/giganto-client.git", tag = "0.21.0" } graphql_client = "0.14" diff --git a/src/graphql.rs b/src/graphql.rs index 90af267b..ceacfde8 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -66,6 +66,9 @@ pub struct Query( netflow::NetflowQuery, ); +#[derive(Default, MergedObject)] +pub struct MinimalQuery(status::StatusQuery); + #[derive(Default, MergedObject)] pub struct Mutation(status::ConfigMutation); @@ -141,7 +144,8 @@ pub trait ClusterSortKey { fn secondary(&self) -> Option<&str>; } -pub type Schema = async_graphql::Schema; +type Schema = async_graphql::Schema; +type MinimalSchema = async_graphql::Schema; type ConnArgs = (Vec<(Box<[u8]>, T)>, bool, bool); pub struct NodeName(pub String); @@ -164,7 +168,7 @@ pub fn schema( notify_terminate: Arc, ack_transmission_cnt: AckTransmissionCount, is_local_config: bool, - settings: Settings, + settings: Option, ) -> Schema { Schema::build(Query::default(), Mutation::default(), EmptySubscription) .data(node_name) @@ -184,6 +188,28 @@ pub fn schema( .finish() } +pub fn minimal_schema( + reload_tx: Sender, + notify_reboot: Arc, + notify_power_off: Arc, + notify_terminate: Arc, + is_local_config: bool, + settings: Option, +) -> MinimalSchema { + MinimalSchema::build( + MinimalQuery::default(), + Mutation::default(), + EmptySubscription, + ) + .data(reload_tx) + .data(TerminateNotify(notify_terminate)) + .data(RebootNotify(notify_reboot)) + .data(PowerOffNotify(notify_power_off)) + .data(is_local_config) + .data(settings) + .finish() +} + /// The default page size for connections when neither `first` nor `last` is /// provided. Maximum size: 100. const MAXIMUM_PAGE_SIZE: usize = 100; @@ -1779,7 +1805,7 @@ mod tests { let notify_reboot = Arc::new(Notify::new()); let notify_power_off = Arc::new(Notify::new()); let notify_terminate = Arc::new(Notify::new()); - let settings = Settings::new().unwrap(); + let settings = Settings::from_file("tests/config.toml").unwrap(); let schema = schema( NodeName("giganto1".to_string()), db.clone(), @@ -1794,7 +1820,7 @@ mod tests { notify_terminate, Arc::new(RwLock::new(1024)), is_local_config, - settings, + Some(settings), ); Self { diff --git a/src/graphql/status.rs b/src/graphql/status.rs index 55f3ea33..2cca5a3f 100644 --- a/src/graphql/status.rs +++ b/src/graphql/status.rs @@ -166,9 +166,11 @@ impl StatusQuery { if *is_local { Err(anyhow!("Config is local").into()) } else { - let s = ctx.data::()?; - - Ok(s.config.clone()) + let s = ctx.data::>()?; + if let Some(settings) = s { + return Ok(settings.config.clone()); + } + Err(anyhow!("The config doesn't exist and needs to be set up remotely.").into()) } } @@ -191,11 +193,13 @@ impl ConfigMutation { let config_draft: Config = toml::from_str(&draft)?; - let s = ctx.data::()?; + let s = ctx.data::>()?; - if s.config == config_draft { - info!("No changes."); - return Err("No changes".to_string().into()); + if let Some(settings) = s { + if settings.config == config_draft { + info!("No changes."); + return Err("No changes".to_string().into()); + } } let reload_tx = ctx.data::>()?; @@ -247,7 +251,11 @@ pub fn settings_to_doc(settings: &Settings) -> Result { pub fn write_toml_file(doc: &DocumentMut, path: &str) -> Result<()> { let output = doc.to_string(); - let mut config_file = OpenOptions::new().write(true).truncate(true).open(path)?; + let mut config_file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(path)?; writeln!(config_file, "{output}")?; Ok(()) } @@ -438,8 +446,8 @@ mod tests { max_mb_of_level_base = 512 num_of_thread = 8 max_sub_compactions = 2 - addr_to_peers = "127.0.0.1:48383" - peers = [{ addr = "127.0.0.1:60192", hostname = "node2" }] + addr_to_peers = "127.0.0.1:48382" + peers = [{ addr = "127.0.0.1:60193", hostname = "node3" }] "# .to_string() } diff --git a/src/ingest.rs b/src/ingest.rs index c893456e..ff3207c3 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -110,6 +110,7 @@ impl Server { runtime_ingest_sensors, rx, notify_sensor, + notify_shutdown.clone(), )); let shutdown_signal = Arc::new(AtomicBool::new(false)); @@ -1074,6 +1075,7 @@ async fn check_sensors_conn( runtime_ingest_sensors: RunTimeIngestSensors, mut rx: Receiver, notify_sensor: Option>, + notify_shutdown: Arc, ) -> Result<()> { let mut itv = time::interval(time::Duration::from_secs(SENSOR_INTERVAL)); itv.reset(); @@ -1121,8 +1123,14 @@ async fn check_sensors_conn( } } } + + () = notify_shutdown.notified() => { + break; + }, } } + + Ok(()) } pub struct NetworkKey { diff --git a/src/main.rs b/src/main.rs index ad41e205..2fab36b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,22 +9,22 @@ mod web; use std::{ collections::{HashMap, HashSet}, - env, fs, + fs, + net::SocketAddr, path::Path, - process::exit, sync::{Arc, Mutex}, - time::{Duration, Instant}, + time::Duration, }; use anyhow::{anyhow, bail, Context, Result}; use chrono::{DateTime, Utc}; use clap::Parser; +use graphql::status::{settings_to_doc, write_toml_file}; use peer::{PeerIdentity, PeerIdents, PeerInfo, Peers}; use quinn::Connection; -use rocksdb::DB; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use settings::Settings; -use storage::Database; +use settings::{Settings, DEFAULT_GRAPHQL_SRV_ADDR}; +use storage::{db_path_and_option, repair_db, Database}; use tokio::{ runtime, select, sync::{ @@ -49,6 +49,7 @@ use crate::{ const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); const WAIT_SHUTDOWN: u64 = 15; +const DEFAULT_TOML: &str = "/opt/clumit/conf/giganto.toml"; pub type PcapSensors = Arc>>>; pub type IngestSensors = Arc>>; @@ -61,13 +62,14 @@ pub type AckTransmissionCount = Arc>; async fn main() -> Result<()> { let args = Args::parse(); let is_local_config = args.is_local(); - let mut settings = if let Some(config_filename) = args.config { - Settings::from_file(&config_filename)? - } else { - Settings::new()? - }; - - let cfg_path = settings.cfg_path.clone(); + let mut settings = args + .config + .as_ref() + .map(|config_filename| { + Settings::from_file(config_filename) + .map_err(|e| anyhow!("Failed to read local config file: {e}")) + }) + .transpose()?; let cert_pem = fs::read(&args.cert) .with_context(|| format!("failed to read certificate file: {}", args.cert))?; @@ -83,41 +85,27 @@ async fn main() -> Result<()> { root: root_cert.clone(), }); - let _guard = init_tracing(&settings.config.log_dir)?; - - let db_path = storage::data_dir_to_db_path(&settings.config.data_dir); - let db_options = crate::storage::DbOptions::new( - settings.config.max_open_files, - settings.config.max_mb_of_level_base, - settings.config.num_of_thread, - settings.config.max_sub_compactions, - ); + // A fix for log handling by local/remote mode will be addressed in the + // https://github.com/aicers/giganto/issues/878 issue. + let _guard = init_tracing(settings.as_ref().map(|s| &*s.config.log_dir)); if args.repair { - if !is_local_config { + if let Some(ref settings) = settings { + repair_db( + &settings.config.data_dir, + settings.config.max_open_files, + settings.config.max_mb_of_level_base, + settings.config.num_of_thread, + settings.config.max_sub_compactions, + ); + } else { bail!("repair is not allowed on remote config"); } - let start = Instant::now(); - let (db_opts, _) = storage::rocksdb_options(&db_options); - info!("repair db start."); - match DB::repair(&db_opts, db_path) { - Ok(()) => info!("repair ok"), - Err(e) => error!("repair error: {e}"), - } - let dur = start.elapsed(); - info!("{}", to_hms(dur)); - exit(0); } let mut is_reboot = false; let mut is_power_off = false; - - if let Err(e) = migrate_data_dir(&settings.config.data_dir, &db_options) { - error!("migration failed: {e}"); - bail!("migration failed") - } - - let database = storage::Database::open(&db_path, &db_options)?; + let mut is_reload_config = false; let notify_terminate = Arc::new(Notify::new()); let r = notify_terminate.clone(); @@ -132,112 +120,167 @@ async fn main() -> Result<()> { .expect("Failed to build request client pool"); loop { - let pcap_sensors = new_pcap_sensors(); - let ingest_sensors = new_ingest_sensors(&database); - let runtime_ingest_sensors = new_runtime_ingest_sensors(); - let stream_direct_channels = new_stream_direct_channels(); - let (peers, peer_idents) = new_peers_data(settings.config.peers.clone()); + // The flag indicates whether the current giganto is in minimal mode or not. + let is_minimal_mode = settings.is_none(); let (reload_tx, mut reload_rx) = mpsc::channel::(1); let notify_shutdown = Arc::new(Notify::new()); let notify_reboot = Arc::new(Notify::new()); let notify_power_off = Arc::new(Notify::new()); - let mut notify_sensor_change = None; - let ack_transmission_cnt = new_ack_transmission_count(settings.config.ack_transmission); - - let schema = graphql::schema( - NodeName(subject_from_cert(&cert)?.1), - database.clone(), - pcap_sensors.clone(), - ingest_sensors.clone(), - peers.clone(), - request_client_pool.clone(), - settings.config.export_dir.clone(), - reload_tx, - notify_reboot.clone(), - notify_power_off.clone(), - notify_terminate.clone(), - ack_transmission_cnt.clone(), - is_local_config, - settings.clone(), - ); + let retain_flag = Arc::new(Mutex::new(false)); + + let database = if let Some(settings) = settings.clone() { + let (db_path, db_options) = db_path_and_option( + &settings.config.data_dir, + settings.config.max_open_files, + settings.config.max_mb_of_level_base, + settings.config.num_of_thread, + settings.config.max_sub_compactions, + ); + + if let Err(e) = migrate_data_dir(&settings.config.data_dir, &db_options) { + error!("migration failed: {e}"); + bail!("migration failed") + } - task::spawn(web::serve( - schema, - settings.config.graphql_srv_addr, - cert_pem.clone(), - key_pem.clone(), - notify_shutdown.clone(), - )); + let database = storage::Database::open(&db_path, &db_options)?; - let retain_flag = Arc::new(Mutex::new(false)); - let db = database.clone(); - let notify_shutdown_copy = notify_shutdown.clone(); - let running_flag = retain_flag.clone(); - std::thread::spawn(move || { - runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .expect("Cannot create runtime for retain_periodically.") - .block_on(storage::retain_periodically( - ONE_DAY, - settings.config.retention, - db, - notify_shutdown_copy, - running_flag, - )) - .unwrap_or_else(|e| { - error!("retain_periodically task terminated unexpectedly: {e}"); - }); - }); - - if let Some(addr_to_peers) = settings.config.addr_to_peers { - let peer_server = peer::Peer::new(addr_to_peers, &certs.clone())?; - let notify_sensor = Arc::new(Notify::new()); - task::spawn(peer_server.run( + let pcap_sensors = new_pcap_sensors(); + let ingest_sensors = new_ingest_sensors(&database); + let runtime_ingest_sensors = new_runtime_ingest_sensors(); + let stream_direct_channels = new_stream_direct_channels(); + let (peers, peer_idents) = new_peers_data(settings.config.peers.clone()); + let mut notify_sensor_change = None; + let ack_transmission_cnt = new_ack_transmission_count(settings.config.ack_transmission); + + let schema = graphql::schema( + NodeName(subject_from_cert(&cert)?.1), + database.clone(), + pcap_sensors.clone(), + ingest_sensors.clone(), + peers.clone(), + request_client_pool.clone(), + settings.config.export_dir.clone(), + reload_tx, + notify_reboot.clone(), + notify_power_off.clone(), + notify_terminate.clone(), + ack_transmission_cnt.clone(), + is_local_config, + Some(settings.clone()), + ); + + task::spawn(web::serve( + schema, + settings.config.graphql_srv_addr, + cert_pem.clone(), + key_pem.clone(), + notify_shutdown.clone(), + )); + + let db = database.clone(); + let notify_shutdown_copy = notify_shutdown.clone(); + let running_flag = retain_flag.clone(); + std::thread::spawn(move || { + runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() + .expect("Cannot create runtime for retain_periodically.") + .block_on(storage::retain_periodically( + ONE_DAY, + settings.config.retention, + db, + notify_shutdown_copy, + running_flag, + )) + .unwrap_or_else(|e| { + error!("retain_periodically task terminated unexpectedly: {e}"); + }); + }); + + if let Some(addr_to_peers) = settings.config.addr_to_peers { + let peer_server = peer::Peer::new(addr_to_peers, &certs.clone())?; + let notify_sensor = Arc::new(Notify::new()); + task::spawn(peer_server.run( + ingest_sensors.clone(), + peers.clone(), + peer_idents.clone(), + notify_sensor.clone(), + notify_shutdown.clone(), + settings.clone(), + )); + notify_sensor_change = Some(notify_sensor); + } + + let publish_server = + publish::Server::new(settings.config.publish_srv_addr, &certs.clone()); + task::spawn(publish_server.run( + database.clone(), + pcap_sensors.clone(), + stream_direct_channels.clone(), ingest_sensors.clone(), peers.clone(), peer_idents.clone(), - notify_sensor.clone(), + certs.clone(), notify_shutdown.clone(), + )); + + let ingest_server = + ingest::Server::new(settings.config.ingest_srv_addr, &certs.clone()); + task::spawn(ingest_server.run( + database.clone(), + pcap_sensors, + ingest_sensors, + runtime_ingest_sensors, + stream_direct_channels, + notify_shutdown.clone(), + notify_sensor_change, + ack_transmission_cnt, + )); + + Some(database) + } else { + // wait for remote configuration + info!("Running in minimal mode."); + let schema = graphql::minimal_schema( + reload_tx, + notify_reboot.clone(), + notify_power_off.clone(), + notify_terminate.clone(), + is_local_config, settings.clone(), + ); + + task::spawn(web::serve( + schema, + DEFAULT_GRAPHQL_SRV_ADDR.parse::().expect("The value of DEFAULT_GRAPHQL_SRV_ADDR is [::]:8442. Converting that value to SocketAddr is always valid."), + cert_pem.clone(), + key_pem.clone(), + notify_shutdown.clone(), )); - notify_sensor_change = Some(notify_sensor); - } - let publish_server = publish::Server::new(settings.config.publish_srv_addr, &certs.clone()); - task::spawn(publish_server.run( - database.clone(), - pcap_sensors.clone(), - stream_direct_channels.clone(), - ingest_sensors.clone(), - peers.clone(), - peer_idents.clone(), - certs.clone(), - notify_shutdown.clone(), - )); - - let ingest_server = ingest::Server::new(settings.config.ingest_srv_addr, &certs.clone()); - task::spawn(ingest_server.run( - database.clone(), - pcap_sensors, - ingest_sensors, - runtime_ingest_sensors, - stream_direct_channels, - notify_shutdown.clone(), - notify_sensor_change, - ack_transmission_cnt, - )); + None + }; loop { select! { Some(config_draft) = reload_rx.recv() => { match Settings::from_server(&config_draft) { Ok(mut new_settings) => { - new_settings.cfg_path.clone_from(&cfg_path); - settings = new_settings; - notify_and_wait_shutdown(notify_shutdown.clone()).await; // Wait for the shutdown to complete - break; + // Since the config file can only be reloaded when running without the + // "-c" option, cfg_path is always assigned the "DEFAULT_TOML". + if let Ok(doc) = settings_to_doc(&new_settings){ + if write_toml_file(&doc, DEFAULT_TOML).is_ok(){ + new_settings.cfg_path.clone_from(&Some(DEFAULT_TOML.to_string())); + settings = Some(new_settings); + notify_and_wait_shutdown(is_minimal_mode, notify_shutdown.clone()).await; // Wait for the shutdown to complete + is_reload_config = true; + break; + } + } + error!("Failed to save the new configuration as {DEFAULT_TOML}"); + warn!("Run giganto with the previous config"); + continue; } Err(e) => { error!("Failed to load the new configuration: {e:#}"); @@ -248,26 +291,26 @@ async fn main() -> Result<()> { }, () = notify_terminate.notified() => { info!("Termination signal: giganto daemon exit"); - notify_and_wait_shutdown(notify_shutdown).await; + notify_and_wait_shutdown(is_minimal_mode, notify_shutdown).await; sleep(Duration::from_millis(SERVER_REBOOT_DELAY)).await; return Ok(()); } () = notify_reboot.notified() => { info!("Restarting the system..."); - notify_and_wait_shutdown(notify_shutdown).await; + notify_and_wait_shutdown(is_minimal_mode, notify_shutdown).await; is_reboot = true; break; } () = notify_power_off.notified() => { info!("Power off the system..."); - notify_and_wait_shutdown(notify_shutdown).await; + notify_and_wait_shutdown(is_minimal_mode, notify_shutdown).await; is_power_off = true; break; } } } - if is_reboot || is_power_off { + if is_reboot || is_power_off || is_reload_config { loop { { let retain_flag = retain_flag.lock().unwrap(); @@ -277,10 +320,19 @@ async fn main() -> Result<()> { } sleep(Duration::from_millis(SERVER_REBOOT_DELAY)).await; } - database.shutdown()?; - info!("Before shut down the system, wait {WAIT_SHUTDOWN} seconds..."); - sleep(tokio::time::Duration::from_secs(WAIT_SHUTDOWN)).await; - break; + + if let Some(database) = database { + database.shutdown()?; + } + + if is_reload_config { + info!("Before reloading config, wait {SERVER_REBOOT_DELAY} seconds..."); + is_reload_config = false; + } else { + info!("Before shut down the system, wait {WAIT_SHUTDOWN} seconds..."); + sleep(tokio::time::Duration::from_secs(WAIT_SHUTDOWN)).await; + break; + } } sleep(Duration::from_millis(SERVER_REBOOT_DELAY)).await; } @@ -377,43 +429,49 @@ fn new_peers_data(peers_list: Option>) -> (Peers, PeerIden ) } -fn init_tracing(path: &Path) -> Result { - if !path.exists() { - bail!("Path not found {path:?}"); - } - +fn init_tracing(log_dir: Option<&Path>) -> WorkerGuard { + let subscriber = tracing_subscriber::Registry::default(); let file_name = format!("{}.log", env!("CARGO_PKG_NAME")); - if std::fs::File::create(path.join(&file_name)).is_err() { - bail!("Cannot create file. {}/{file_name}", path.display()); - } - let file_appender = tracing_appender::rolling::never(path, file_name); - let (file_writer, guard) = tracing_appender::non_blocking(file_appender); - - let layer_file = fmt::Layer::default() - .with_ansi(false) - .with_target(false) - .with_writer(file_writer) - .with_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(), + let is_valid_file = + matches!(log_dir, Some(path) if std::fs::File::create(path.join(&file_name)).is_ok()); + + let (layer, guard) = if !is_valid_file || cfg!(debug_assertions) { + let (stdout_writer, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + ( + fmt::Layer::default() + .with_ansi(true) + .with_writer(stdout_writer) + .with_filter(EnvFilter::from_default_env()), + stdout_guard, + ) + } else { + let file_appender = tracing_appender::rolling::never( + log_dir.expect("verified by is_valid_file"), + file_name, ); - - let layered_subscriber = tracing_subscriber::registry().with(layer_file); - #[cfg(debug_assertions)] - let layered_subscriber = layered_subscriber.with( - fmt::Layer::default() - .with_ansi(true) - .with_filter(EnvFilter::from_default_env()), - ); - layered_subscriber.init(); - - Ok(guard) + let (file_writer, file_guard) = tracing_appender::non_blocking(file_appender); + ( + fmt::Layer::default() + .with_ansi(false) + .with_target(false) + .with_writer(file_writer) + .with_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ), + file_guard, + ) + }; + subscriber.with(layer).init(); + guard } /// Notifies all waiters of `notify_shutdown` and waits for ingest closed notification. -pub async fn notify_and_wait_shutdown(notify_shutdown: Arc) { +pub async fn notify_and_wait_shutdown(is_minimal_mode: bool, notify_shutdown: Arc) { notify_shutdown.notify_waiters(); - notify_shutdown.notified().await; + if !is_minimal_mode { + notify_shutdown.notified().await; + } } diff --git a/src/peer.rs b/src/peer.rs index dcc2cd5e..12044f6b 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -895,7 +895,7 @@ pub mod tests { let file_path = tmp_dir.path().join("config.toml"); File::create(&file_path).unwrap(); - let mut settings = Settings::new().unwrap(); + let mut settings = Settings::from_file("tests/config.toml").unwrap(); settings.cfg_path = Some(file_path.to_str().unwrap().to_string()); // run peer diff --git a/src/settings.rs b/src/settings.rs index 15377a84..2579dbe6 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -10,7 +10,7 @@ use crate::peer::PeerIdentity; const DEFAULT_INGEST_SRV_ADDR: &str = "[::]:38370"; const DEFAULT_PUBLISH_SRV_ADDR: &str = "[::]:38371"; -const DEFAULT_GRAPHQL_SRV_ADDR: &str = "[::]:8442"; +pub const DEFAULT_GRAPHQL_SRV_ADDR: &str = "[::]:8442"; const DEFAULT_INVALID_ADDR_TO_PEERS: &str = "254.254.254.254:38383"; const DEFAULT_ACK_TRANSMISSION: u16 = 1024; const DEFAULT_RETENTION: &str = "100d"; @@ -88,30 +88,6 @@ pub struct Settings { } impl Settings { - /// Creates a new `Settings` instance, populated from the default - /// configuration file if it exists. - pub fn new() -> Result { - let dirs = directories::ProjectDirs::from("com", "cluml", "giganto").expect("unreachable"); - let config_path = dirs.config_dir().join("config.toml"); - if config_path.exists() { - // `config::File` requires a `&str` path, so we can't use `config_path` directly. - if let Some(path) = config_path.to_str() { - Self::from_file(path) - } else { - Err(ConfigError::Message( - "config path must be a valid UTF-8 string".to_string(), - )) - } - } else { - let config: Config = default_config_builder().build()?.try_deserialize()?; - - Ok(Self { - config, - cfg_path: None, - }) - } - } - /// Creates a new `Settings` instance, populated from the given /// configuration file. pub fn from_file(cfg_path: &str) -> Result { @@ -146,19 +122,6 @@ impl Settings { /// Creates a new `ConfigBuilder` instance with the default configuration. fn default_config_builder() -> ConfigBuilder { - let db_dir = - directories::ProjectDirs::from_path(PathBuf::from("db")).expect("unreachable db dir"); - let log_dir = directories::ProjectDirs::from_path(PathBuf::from("logs/apps")) - .expect("unreachable logs dir"); - let export_dir = directories::ProjectDirs::from_path(PathBuf::from("export")) - .expect("unreachable export dir"); - let db_path = db_dir.data_dir().to_str().expect("unreachable db path"); - let log_path = log_dir.data_dir().to_str().expect("unreachable log path"); - let export_path = export_dir - .data_dir() - .to_str() - .expect("unreachable export path"); - ConfConfig::builder() .set_default("ingest_srv_addr", DEFAULT_INGEST_SRV_ADDR) .expect("valid address") @@ -166,14 +129,8 @@ fn default_config_builder() -> ConfigBuilder { .expect("valid address") .set_default("graphql_srv_addr", DEFAULT_GRAPHQL_SRV_ADDR) .expect("local address") - .set_default("data_dir", db_path) - .expect("data dir") .set_default("retention", DEFAULT_RETENTION) .expect("retention") - .set_default("log_dir", log_path) - .expect("log dir") - .set_default("export_dir", export_path) - .expect("export_dir") .set_default("max_open_files", DEFAULT_MAX_OPEN_FILES) .expect("default max open files") .set_default("max_mb_of_level_base", DEFAULT_MAX_MB_OF_LEVEL_BASE) diff --git a/src/storage.rs b/src/storage.rs index 4b397917..cf30789b 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -7,8 +7,9 @@ use std::{ marker::PhantomData, ops::Deref, path::{Path, PathBuf}, + process::exit, sync::{Arc, Mutex}, - time::Duration, + time::{Duration, Instant}, }; use anyhow::{Context, Result}; @@ -41,6 +42,7 @@ use tracing::{debug, error, info, warn}; use crate::{ graphql::{NetworkFilter, RawEventFilter, TIMESTAMP_SIZE}, ingest::implement::EventFilter, + to_hms, }; const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 39] = [ @@ -1083,3 +1085,46 @@ pub(crate) fn rocksdb_options(db_options: &DbOptions) -> (Options, Options) { pub(crate) fn data_dir_to_db_path(data_dir: &Path) -> PathBuf { data_dir.join("db") } + +pub fn db_path_and_option( + data_dir: &Path, + max_open_files: i32, + max_mb_of_level_base: u64, + num_of_thread: i32, + max_sub_compactions: u32, +) -> (PathBuf, DbOptions) { + let db_path = data_dir_to_db_path(data_dir); + let db_options = DbOptions::new( + max_open_files, + max_mb_of_level_base, + num_of_thread, + max_sub_compactions, + ); + (db_path, db_options) +} + +pub fn repair_db( + data_dir: &Path, + max_open_files: i32, + max_mb_of_level_base: u64, + num_of_thread: i32, + max_sub_compactions: u32, +) { + let (db_path, db_options) = db_path_and_option( + data_dir, + max_open_files, + max_mb_of_level_base, + num_of_thread, + max_sub_compactions, + ); + let start = Instant::now(); + let (db_opts, _) = rocksdb_options(&db_options); + info!("repair db start."); + match DB::repair(&db_opts, db_path) { + Ok(()) => info!("repair ok"), + Err(e) => error!("repair error: {e}"), + } + let dur = start.elapsed(); + info!("{}", to_hms(dur)); + exit(0); +} diff --git a/src/web.rs b/src/web.rs index a55de705..0ca1d88d 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,26 +1,27 @@ use std::{convert::Infallible, net::SocketAddr, sync::Arc}; -use async_graphql::http::{playground_source, GraphQLPlaygroundConfig}; +use async_graphql::{ + http::{playground_source, GraphQLPlaygroundConfig}, + Executor, +}; use tokio::{sync::Notify, task}; use tracing::info; use warp::{http::Response as HttpResponse, Filter}; -use crate::graphql::Schema; - /// Runs the GraphQL server. /// /// Note that `key` is not compatible with the DER-encoded key extracted by /// rustls-pemfile. #[allow(clippy::unused_async)] -pub async fn serve( - schema: Schema, +pub async fn serve( + schema: S, addr: SocketAddr, cert: Vec, key: Vec, notify_shutdown: Arc, ) { let filter = async_graphql_warp::graphql(schema).and_then( - |(schema, request): (Schema, async_graphql::Request)| async move { + |(schema, request): (S, async_graphql::Request)| async move { let resp = schema.execute(request).await; Ok::<_, Infallible>(async_graphql_warp::GraphQLResponse::from(resp))