diff --git a/Cargo.lock b/Cargo.lock index 2d110d8..2d9df6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2663,6 +2663,17 @@ dependencies = [ "winreg", ] +[[package]] +name = "ron" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86018df177b1beef6c7c8ef949969c4f7cb9a9344181b92486b23c79995bdaa4" +dependencies = [ + "base64 0.13.0", + "bitflags", + "serde", +] + [[package]] name = "route-recognizer" version = "0.2.0" @@ -2680,6 +2691,18 @@ dependencies = [ "serde", ] +[[package]] +name = "rustbreak" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460d97902465327d69ecfe8cefdb5972c6f94d6127ac9e992acdb51458bebc27" +dependencies = [ + "ron", + "serde", + "tempfile", + "thiserror", +] + [[package]] name = "rustc-hash" version = "1.1.0" @@ -2936,9 +2959,9 @@ checksum = "42a568c8f2cd051a4d283bd6eb0343ac214c1b0f1ac19f93e1175b2dee38c73d" [[package]] name = "signal-hook" -version = "0.3.9" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "470c5a6397076fae0094aaf06a08e6ba6f37acb77d3b1b91ea92b4d6c8650c39" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" dependencies = [ "libc", "signal-hook-registry", @@ -3555,6 +3578,15 @@ dependencies = [ "serde", ] +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +dependencies = [ + "getrandom 0.2.3", +] + [[package]] name = "value-bag" version = "1.0.0-alpha.7" @@ -3905,9 +3937,11 @@ dependencies = [ "rand 0.7.3", "redis", "reed-solomon-erasure", + "rustbreak", "serde", "serde_json", "sha-1", + "signal-hook", "simple_logger", "snap", "structopt", @@ -3916,4 +3950,5 @@ dependencies = [ "tokio", "tokio-util", "toml", + "uuid", ] diff --git a/zstor/Cargo.toml b/zstor/Cargo.toml index 368728f..7e61e7d 100644 --- a/zstor/Cargo.toml +++ b/zstor/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "zstor_v2" -version = "0.3.0-rc.12" authors = ["The Threefold Tech developers "] edition = "2021" +name = "zstor_v2" repository = "https://github.com/threefoldtech/0-stor_v2" +version = "0.3.0-rc.12" [[bin]] name = "zstor_v2" @@ -14,50 +14,53 @@ name = "test-zdb" path = "src/test_zdb.rs" [[bench]] -name = "aes_encryptor" harness = false +name = "aes_encryptor" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] vendored = ["openssl"] [dependencies] -reed-solomon-erasure = "4" -rand = "0.7" -serde = { version = "1.0", features = ["derive"] } -toml = "0.5" -hex = "0.4" -aes-gcm = { version = "0.9", features = ["std"] } -snap = "1" -log = { version = "0.4", features = ["release_max_level_debug"] } -redis = { version = "0.20", default-features = false, features = ["aio", "tokio-comp", "connection-manager"] } -sha-1 = "0.9" +actix = "0.12" +actix-rt = "2.2" +aes-gcm = {version = "0.9", features = ["std"]} +async-trait = "0.1" +bincode = "1" +blake2 = "0.9" +chrono = "0.4" +futures = "0.3" +gray-codes = "0.1" +grid_explorer_client = {path = "../grid_explorer_client"} +hex = "0.4" +log = {version = "0.4", features = ["release_max_level_debug"]} +log4rs = {version = "1", default-features = false, features = ["rolling_file_appender", "threshold_filter", "fixed_window_roller", "size_trigger", "compound_policy"]} +nix = "0.22" +openssl = {version = "0.10", features = ["vendored"], optional = true} +path-clean = "0.1.0" +pretty_env_logger = "0.4" +prometheus = "0.12" +rand = "0.7" +redis = {version = "0.20", default-features = false, features = ["aio", "tokio-comp", "connection-manager"]} +reed-solomon-erasure = "4" +rustbreak = {version = "2.0.0", features = ["ron_enc"]} +serde = {version = "1.0", features = ["derive"]} +serde_json = "1" +sha-1 = "0.9" +signal-hook = "0.3.14" simple_logger = "1.11" # TODO: remove this -pretty_env_logger = "0.4" -structopt = "0.3" -tokio = { version = "1", features = ["rt", "macros", "fs"] } -futures = "0.3" -blake2 = "0.9" -gray-codes = "0.1" -log4rs = { version = "1", default-features = false, features = ["rolling_file_appender", "threshold_filter", "fixed_window_roller", "size_trigger", "compound_policy"] } -async-trait = "0.1" -bincode = "1" -openssl = { version = "0.10", features = ["vendored"], optional = true } -actix = "0.12" -actix-rt = "2.2" -tokio-util = "0.6" -grid_explorer_client = { path = "../grid_explorer_client" } -chrono = "0.4" -serde_json = "1" -prometheus = "0.12" -tide = "0.16" -nix = "0.22" -path-clean = "0.1.0" +snap = "1" +structopt = "0.3" +tide = "0.16" +tokio = {version = "1", features = ["rt", "macros", "fs"]} +tokio-util = "0.6" +toml = "0.5" +uuid = {version = "1.1.2", features = ["v4"]} [build-dependencies] bindgen = "0.59" [dev-dependencies] -rand = "0.7" criterion = "0.3" -testutils = { path = "testutils" } \ No newline at end of file +rand = "0.7" +testutils = {path = "testutils"} diff --git a/zstor/src/actors/zstor.rs b/zstor/src/actors/zstor.rs index 167e56a..02b91a7 100644 --- a/zstor/src/actors/zstor.rs +++ b/zstor/src/actors/zstor.rs @@ -64,6 +64,21 @@ pub struct Store { pub blocking: bool, } +/// Store objects can be Saved to the Disk with a DB implements this trait +pub trait StorePersist { + /// Returned Err + type Error; + /// Insert Store + /// Returns store's Id + fn insert(&self, store: Store) -> u128; + /// Delete Store By the returned Id + fn delete(&self, id: u128); + /// Return the content into Vector without consuming the handle + fn vectored_content(&self) -> Vec; + /// Persist the content + fn save(&self) -> Result<(), Self::Error>; +} + #[derive(Serialize, Deserialize, Debug, Message, Clone)] #[rtype(result = "Result<(), ZstorError>")] /// Message for the retrieve command of zstor. diff --git a/zstor/src/actors/zstor_scheduler.rs b/zstor/src/actors/zstor_scheduler.rs index 8e694c0..3103801 100644 --- a/zstor/src/actors/zstor_scheduler.rs +++ b/zstor/src/actors/zstor_scheduler.rs @@ -1,11 +1,18 @@ +use std::collections::HashMap; +use std::path::Path; + use super::priority_queue::PriorityQueue; -use super::zstor::{Check, Rebuild, Retrieve, Store, ZstorActor, ZstorCommand}; +use super::zstor::{Check, Rebuild, Retrieve, Store, StorePersist, ZstorActor, ZstorCommand}; use crate::{meta::Checksum, ZstorError, ZstorErrorKind}; use actix::prelude::*; use log::{debug, error}; +use rustbreak::backend::FileBackend; +use rustbreak::deser::Ron; +use rustbreak::{Database, FileDatabase, RustbreakError}; use tokio::sync::{mpsc, oneshot}; const LEVELS: u16 = 3; use tokio::time; +use uuid::Uuid; #[derive(Debug, Message)] #[rtype(result = "Result<(), ZstorError>")] @@ -52,7 +59,7 @@ pub enum ZstorSchedulerResponse { Done, } /// Worker for procesing the scheduler messages -pub struct Looper { +struct Looper { zstor: Addr, cmds: PriorityQueue, ch: mpsc::UnboundedReceiver, @@ -150,16 +157,85 @@ impl Looper { } } +/// This DB is works in-memory until calling save() function +pub struct StoreDb { + db: Database, FileBackend, Ron>, +} + +impl StoreDb { + /// Create a new StoreDb + /// if the path exists load it + pub fn new_or_load_from>(path: P) -> Self { + let db = FileDatabase::, Ron>::load_from_path_or_default(path) + .expect("can't open database file for saving stores"); + + Self { db } + } +} + +impl StorePersist for StoreDb { + type Error = RustbreakError; + + fn insert(&self, store: Store) -> u128 { + let id = Uuid::new_v4().as_u128(); + let result = self.db.write(|db| { + db.insert(id, store); + }); + if let Err(err) = result { + log::debug!("failed to insert store into store db: '{}'", err); + } + id + } + + fn delete(&self, id: u128) { + let result = self.db.write(|db| db.remove(&id)); + + if let Err(err) = result { + log::debug!("failed to remove store from store db: '{}'", err); + } + } + + fn save(&self) -> Result<(), Self::Error> { + self.db.save() + } + + fn vectored_content(&self) -> Vec { + let data = self + .db + .read(|db| { + let stores = { db.values().into_iter().map(|v| v.to_owned()).collect() }; + stores + }) + .unwrap(); + + self.db + .write(|db| { + db.clear(); + }) + .unwrap(); + + data + } +} + /// Actor for the main zstor object encoding and decoding. -pub struct ZstorActorScheduler { +pub struct ZstorActorScheduler { zstor: Addr, ch: Option>, + stores: D, } -impl ZstorActorScheduler { +impl ZstorActorScheduler +where + D: StorePersist, +{ /// new - pub fn new(zstor: Addr) -> ZstorActorScheduler { - Self { zstor, ch: None } + pub fn new(zstor: Addr, store_db: D) -> ZstorActorScheduler { + Self { + zstor, + ch: None, + stores: store_db, + } } fn push_zstor( @@ -194,7 +270,10 @@ impl ZstorActorScheduler { } } -impl Actor for ZstorActorScheduler { +impl Actor for ZstorActorScheduler +where + D: std::marker::Unpin + 'static, +{ type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { @@ -205,12 +284,18 @@ impl Actor for ZstorActorScheduler { } } -impl Handler for ZstorActorScheduler { +impl Handler for ZstorActorScheduler +where + D: 'static + StorePersist + std::marker::Unpin, +{ type Result = ResponseFuture>; fn handle(&mut self, msg: Store, _: &mut Self::Context) -> Self::Result { let ch = self.ch.as_ref().unwrap().clone(); - Box::pin(async move { + + let id = self.stores.insert(msg.clone()); + + let ret = Box::pin(async move { let blocking = msg.blocking; let resp_fut = Self::push_zstor(ch, ZstorCommand::Store(msg), blocking)?; if !blocking { @@ -226,11 +311,18 @@ impl Handler for ZstorActorScheduler { format!("received {:?} while expecting a store response", resp), )), } - }) + }); + + self.stores.delete(id); + + ret } } -impl Handler for ZstorActorScheduler { +impl Handler for ZstorActorScheduler +where + D: 'static + StorePersist + std::marker::Unpin, +{ type Result = ResponseFuture>; fn handle(&mut self, msg: Retrieve, _: &mut Self::Context) -> Self::Result { @@ -252,7 +344,10 @@ impl Handler for ZstorActorScheduler { } } -impl Handler for ZstorActorScheduler { +impl Handler for ZstorActorScheduler +where + D: 'static + StorePersist + std::marker::Unpin, +{ type Result = ResponseFuture>; fn handle(&mut self, msg: Rebuild, _: &mut Self::Context) -> Self::Result { @@ -274,7 +369,10 @@ impl Handler for ZstorActorScheduler { } } -impl Handler for ZstorActorScheduler { +impl Handler for ZstorActorScheduler +where + D: 'static + StorePersist + std::marker::Unpin, +{ type Result = ResponseFuture, ZstorError>>; fn handle(&mut self, msg: Check, _: &mut Self::Context) -> Self::Result { @@ -296,12 +394,15 @@ impl Handler for ZstorActorScheduler { } } -impl Handler for ZstorActorScheduler { +impl Handler for ZstorActorScheduler +where + D: 'static + StorePersist + std::marker::Unpin, +{ type Result = ResponseFuture>; fn handle(&mut self, _: Signaled, _: &mut Self::Context) -> Self::Result { let ch = self.ch.as_ref().unwrap().clone(); - Box::pin(async move { + let ret = Box::pin(async move { let resp = Self::push(ch, ZstorSchedulerCommand::Finalize, true, LEVELS - 1)? .await .map_err(|err| { @@ -314,6 +415,12 @@ impl Handler for ZstorActorScheduler { format!("received {:?} while expecting a check response", resp), )), } - }) + }); + + if self.stores.save().is_err() { + error!("Error while saving non-handled stores"); + } + + ret } } diff --git a/zstor/src/main.rs b/zstor/src/main.rs index b30e534..7a23729 100644 --- a/zstor/src/main.rs +++ b/zstor/src/main.rs @@ -19,8 +19,10 @@ use structopt::StructOpt; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::{UnixListener, UnixStream}; use zstor_v2::actors::config::ReloadConfig; +use zstor_v2::actors::zstor::StorePersist; use zstor_v2::actors::zstor::{Check, Rebuild, Retrieve, Store, ZstorCommand, ZstorResponse}; use zstor_v2::actors::zstor_scheduler::Signaled; +use zstor_v2::actors::zstor_scheduler::StoreDb; use zstor_v2::actors::zstor_scheduler::ZstorActorScheduler; use zstor_v2::config::Config; use zstor_v2::zdb::SequentialZdb; @@ -28,6 +30,8 @@ use zstor_v2::{ZstorError, ZstorErrorKind, ZstorResult}; const MIB: u64 = 1 << 20; +const STORES_DB_PATH: &str = "storesdb.ron"; + #[derive(StructOpt, Debug)] #[structopt(about = "zstor data encoder")] /// Zstor data encoder @@ -336,7 +340,10 @@ async fn real_main() -> ZstorResult<()> { } } write_pid_file(&pid_path).await?; - let zstor_scheduler = ZstorActorScheduler::new(zstor.clone()).start(); + + let zstordb = StoreDb::new_or_load_from(STORES_DB_PATH); + let old_stores = zstordb.vectored_content(); + let zstor_scheduler = ZstorActorScheduler::new(zstor.clone(), zstordb).start(); let server = if let Some(socket) = cfg.socket() { let _ = fs::remove_file(socket); UnixListener::bind(socket) @@ -358,6 +365,23 @@ async fn real_main() -> ZstorResult<()> { let mut siguserone = actix_rt::signal::unix::signal(SignalKind::user_defined1()) .expect("Failed to install SIGUSR1 handler"); + let mut sigkill = + actix_rt::signal::unix::signal(SignalKind::from_raw(signal_hook::consts::SIGKILL)) + .expect("Failed to install SIGKILL handler"); + + // handle old stores + + let zs = zstor_scheduler.clone(); + + tokio::spawn(async move { + debug!("Handling old(persisted) stores"); + for store in old_stores { + if let Err(e) = zs.send(store).await { + error!("Error while handeling old store: {}", e); + } + } + }); + loop { tokio::select! { accepted = server.accept() => { @@ -376,6 +400,22 @@ async fn real_main() -> ZstorResult<()> { } }); } + + _ = sigkill.recv() => { + info!("Save non-handled stores info into persistent db after receiving SIGKILL"); + match zstor_scheduler.send(Signaled{}).await { + Ok(Ok(())) => { + info!("all commands should be fullfilled by now") + }, + Ok(Err(e)) => { + error!("error while waiting for commands to finish {}", e) + }, + Err(e) => { + error!("error sending the signal to the scheduler {}", e) + } + } + break + } _ = sigints.recv() => { info!("Shutting down zstor daemon after receiving SIGINT"); match zstor_scheduler.send(Signaled{}).await { @@ -420,9 +460,10 @@ async fn real_main() -> ZstorResult<()> { Ok(()) } -async fn handle_client(mut con: C, zstor: Addr) -> ZstorResult<()> +async fn handle_client(mut con: C, zstor: Addr>) -> ZstorResult<()> where C: AsyncRead + AsyncWrite + Unpin, + D: 'static + StorePersist + std::marker::Unpin, { // Read the command from the connection let size = con diff --git a/zstor/tests/integration_test.rs b/zstor/tests/integration_test.rs index 940f28c..29b8d4d 100644 --- a/zstor/tests/integration_test.rs +++ b/zstor/tests/integration_test.rs @@ -146,7 +146,7 @@ fn retrieves_more_prior_than_stores() { // it issues a retrieve command. this command should be handled before // the rest of the stores. this is because the zdb file system blocks on it let mut manager = TestManager::new(TestParams { - id: "rmpts".to_string(), + id: "zse".to_string(), network_speed: None, max_zdb_data_dir_size: None, data_disk_size: "20G".into(),