diff --git a/iml-agent/src/action_plugins/lustre/snapshot.rs b/iml-agent/src/action_plugins/lustre/snapshot.rs index 3f2bdd699f..c0b89f470d 100644 --- a/iml-agent/src/action_plugins/lustre/snapshot.rs +++ b/iml-agent/src/action_plugins/lustre/snapshot.rs @@ -4,8 +4,62 @@ use crate::{agent_error::ImlAgentError, device_scanner_client, lustre::lctl}; use chrono::{DateTime, TimeZone, Utc}; +use futures::{future::try_join_all, TryFutureExt}; use iml_wire_types::snapshot::{Create, Destroy, List, Mount, Snapshot, Unmount}; +async fn get_snapshot_fsname_and_status<'a>( + snapshot_name: String, + target: &'a str, + filesystem_name: String, + create_time: DateTime, + modify_time: DateTime, + comment: Option, + mounts: &[device_types::mount::Mount], +) -> Result< + Option<( + String, + String, + DateTime, + DateTime, + Option, + String, + bool, + )>, + ImlAgentError, +> { + let snapshot_fsname = get_snapshot_label(&snapshot_name, &target).await?; + + let snapshot_fsname = if let Some(name) = snapshot_fsname { + name + } else { + return Ok(None); + }; + + let mounted: bool = mounts + .iter() + .filter(|x| x.opts.0.split(',').any(|x| x == "nomgs")) + .filter(|x| x.fs_type.0 == "lustre") + .find_map(|x| { + let s = x.opts.0.split(',').find(|x| x.starts_with("svname="))?; + + let s = s.split('=').nth(1)?.split('-').next()?; + + Some(s.to_string()) + }) + .map(|x| x == snapshot_fsname) + .unwrap_or_else(|| false); + + Ok(Some(( + snapshot_name, + filesystem_name, + create_time, + modify_time, + comment, + snapshot_fsname, + mounted, + ))) +} + pub async fn list(l: List) -> Result, ImlAgentError> { let mut args = vec!["--device", &l.target, "snapshot", "-l"]; if let Some(name) = &l.name { @@ -18,37 +72,81 @@ pub async fn list(l: List) -> Result, ImlAgentError> { return Ok(vec![]); } - let mounts = device_scanner_client::get_mounts().await?; + let fs_name = &l.target.split('-').next().map(String::from); - for snap in parse_device_snapshots(stdout) { - let label = get_snapshot_label(&snap.0, &l.target).await?; + let filesystem_name = if let Some(name) = fs_name { + name.to_string() + } else { + return Ok(vec![]); + }; - if label.is_none() { - continue; - } + let mounts = device_scanner_client::get_mounts().await?; - mounts - .iter() - .filter(|x| x.opts.0.split(',').any(|x| x == "nomgs")) - .filter(|x| x.fs_type.0 == "lustre") - .find_map(|x| { - let s = x.opts.0.split(',').find(|x| x.starts_with("svname="))?; + let xs = parse_device_snapshots(stdout).into_iter().map( + |(snapshot_name, create_time, modify_time, comment)| { + let filesystem_name = filesystem_name.to_string(); + + let xs = get_snapshot_fsname_and_status( + snapshot_name.to_string(), + &l.target, + filesystem_name.to_string(), + create_time, + modify_time, + comment, + &mounts, + ) + .map_ok(|x| { + x.map( + |( + snapshot_name, + filesystem_name, + create_time, + modify_time, + comment, + snapshot_fsname, + mounted, + )| Snapshot { + filesystem_name, + snapshot_name, + create_time, + modify_time, + comment, + snapshot_fsname, + mounted, + }, + ) + }); - let s = s.split('=').nth(1)?; + xs + }, + ); - Some(s.to_string()) - }); - } + let snapshots = try_join_all(xs) + .await? + .into_iter() + .filter_map(|x| x) + .collect::>(); - return Ok(vec![]); + return Ok(snapshots); } -async fn get_snapshot_label(name: &str, target: &str) -> Result, ImlAgentError> { - let x = lctl(vec!["--device", target, "snapshot", "--get_label", name]).await?; - - match x.trim() { - "" => Ok(None), - _ => Ok(Some(x)), +async fn get_snapshot_label( + snapshot_name: &str, + target: &str, +) -> Result, ImlAgentError> { + let x = lctl(vec![ + "--device", + target, + "snapshot", + "--get_label", + snapshot_name, + ]) + .await?; + let x = x.trim().split('-').next(); + + match x { + None => Ok(None), + Some(label) => Ok(Some(label.to_string())), } } diff --git a/iml-agent/src/daemon_plugins/snapshot.rs b/iml-agent/src/daemon_plugins/snapshot.rs index a720a935cc..6ecc273ea4 100644 --- a/iml-agent/src/daemon_plugins/snapshot.rs +++ b/iml-agent/src/daemon_plugins/snapshot.rs @@ -12,17 +12,22 @@ use crate::{ action_plugins::lustre::snapshot, agent_error::ImlAgentError, daemon_plugins::{DaemonPlugin, Output}, - lustre::{lctl, list_mdt0s}, + device_scanner_client, + lustre::list_mdt0s, }; use async_trait::async_trait; use futures::{ - future::{join_all, AbortHandle, Abortable}, + future::{try_join_all, AbortHandle, Abortable}, lock::Mutex, Future, FutureExt, }; use iml_wire_types::snapshot::{List, Snapshot}; -use std::collections::BTreeSet; -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, + sync::Arc, + time::Duration, +}; use tokio::time::delay_for; struct State { @@ -46,48 +51,70 @@ pub(crate) fn create() -> SnapshotList { } } -async fn list() -> Result, ()> { - let xs = list_mdt0s().await; +async fn list() -> Result>>, ImlAgentError> { + // list the mounts + // remove any mdt0's that are snapshots + let snapshot_mounts = device_scanner_client::get_snapshot_mounts() + .await? + .into_iter() + .map(|x| { + let s = x + .opts + .0 + .split(',') + .find(|x| x.starts_with("svname=")) + .unwrap_or_else(|| ""); + + let s = s.split('=').nth(1).unwrap_or_else(|| ""); + + s.to_string() + }) + .filter(|x| x != "") + .collect::>(); + tracing::debug!("snapshot mounts {:?}", snapshot_mounts); + + let xs = list_mdt0s().await.into_iter().collect::>(); + let xs = xs + .difference(&snapshot_mounts) + .map(|x| x.to_string()) + .collect::>(); tracing::debug!("mdt0s: {:?}", &xs); - xs.into_iter().map(|x| async move {}); - - Ok(vec![]) - - // let futs = fss.into_iter().map(|fs| async move { - // snapshot::list(List { - // fsname: fs.clone(), - // name: None, - // }) - // .await - // .map_err(|e| (fs, e)) - // }); - - // let (oks, errs): (Vec<_>, Vec<_>) = join_all(futs).await.into_iter().partition(Result::is_ok); - - // let snaps = oks - // .into_iter() - // .map(|x| x.unwrap()) - // .flatten() - // .collect::>(); - - // let snapshot_fsnames = snaps - // .iter() - // .map(|s| &s.snapshot_fsname) - // .collect::>(); - - // let really_failed_fss = errs - // .into_iter() - // .map(|x| x.unwrap_err()) - // .filter(|x| !snapshot_fsnames.contains(&x.0)) - // .collect::>(); - - // if !really_failed_fss.is_empty() { - // // XXX debug because of false positives - // tracing::debug!("listing failed: {:?}", really_failed_fss); - // } - // Ok(snaps) + if xs.is_empty() { + tracing::debug!("No mdt0's. Returning none."); + return Ok(None); + } + + let xs = xs.into_iter().map(|x| async move { + let fs_name = x.split('-').next(); + tracing::debug!("fs_name is: {:?}", fs_name); + + if let Some(fs_name) = fs_name { + snapshot::list(List { + target: x.to_string(), + name: None, + }) + .await + .map(|x| (fs_name.to_string(), x)) + .map_err(|x| { + tracing::debug!("Error calling snapshot list: {:?}", x); + x + }) + } else { + tracing::debug!("No fs_name. Returning MarkerNotFound error."); + Err(ImlAgentError::MarkerNotFound) + } + }); + + let snapshots = try_join_all(xs) + .await? + .into_iter() + .collect::>>(); + + tracing::debug!("snapshots: {:?}", snapshots); + + Ok(Some(snapshots)) } #[async_trait] @@ -105,14 +132,20 @@ impl DaemonPlugin for SnapshotList { async move { loop { if let Ok(snapshots) = list().await { - tracing::debug!("snapshots ({}): {:?}", snapshots.len(), &snapshots); + let output = snapshots + .map(|xs| serde_json::to_value(xs)) + .transpose() + .unwrap(); - let output = serde_json::to_value(snapshots).map(Some).unwrap(); let mut lock = state.lock().await; + if lock.output != output { + tracing::debug!("Snapshot output changed. Updating."); lock.output = output; lock.updated = true; } + } else { + tracing::debug!("Error calling list(). Not processing snapshots."); } delay_for(Duration::from_secs(10)).await; } diff --git a/iml-gui/crate/src/page/snapshot/list.rs b/iml-gui/crate/src/page/snapshot/list.rs index 42025d963b..ce13596605 100644 --- a/iml-gui/crate/src/page/snapshot/list.rs +++ b/iml-gui/crate/src/page/snapshot/list.rs @@ -135,9 +135,8 @@ pub fn view(model: &Model, cache: &ArcCache) -> Node { x.comment.as_deref().unwrap_or("---") ], table::td_center(plain![match &x.mounted { - Some(true) => "mounted", - Some(false) => "unmounted", - None => "unknown", + true => "mounted", + false => "unmounted", }]), ] })] diff --git a/iml-manager-cli/src/display_utils.rs b/iml-manager-cli/src/display_utils.rs index 370786e8e2..4fbaaa060c 100644 --- a/iml-manager-cli/src/display_utils.rs +++ b/iml-manager-cli/src/display_utils.rs @@ -126,9 +126,8 @@ impl IntoTable for Vec { s.snapshot_name, s.create_time.to_rfc2822(), match s.mounted { - Some(true) => "mounted", - Some(false) => "unmounted", - None => "---", + true => "mounted", + false => "unmounted", } .to_string(), s.comment.unwrap_or_else(|| "---".to_string()), diff --git a/iml-services/iml-snapshot/src/main.rs b/iml-services/iml-snapshot/src/main.rs index b197d6fee9..d33c580b93 100644 --- a/iml-services/iml-snapshot/src/main.rs +++ b/iml-services/iml-snapshot/src/main.rs @@ -26,7 +26,8 @@ async fn main() -> Result<(), Box> { let ch = iml_rabbit::create_channel(&conn).await?; - let mut s = consume_data::>(&ch, "rust_agent_snapshot_rx"); + let mut s = + consume_data::>>(&ch, "rust_agent_snapshot_rx"); let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?; let pool_2 = pool.clone(); @@ -60,38 +61,40 @@ async fn main() -> Result<(), Box> { pool_3.clone(), )); - while let Some((fqdn, snapshots)) = s.try_next().await? { - tracing::debug!("snapshots from {}: {:?}", fqdn, snapshots); - - let snaps = snapshots.into_iter().fold( - (vec![], vec![], vec![], vec![], vec![], vec![], vec![]), - |mut acc, s| { - acc.0.push(s.filesystem_name); - acc.1.push(s.snapshot_name); - acc.2.push(s.create_time.naive_utc()); - acc.3.push(s.modify_time.naive_utc()); - acc.4.push(s.snapshot_fsname.clone()); - acc.5.push(s.mounted); - acc.6.push(s.comment); - - acc - }, - ); - - let mut transaction = pool.begin().await?; - - sqlx::query!( - r#" - DELETE FROM snapshot - WHERE (filesystem_name, snapshot_name) NOT IN (SELECT * FROM UNNEST ($1::text[], $2::text[])) - "#, - &snaps.0, - &snaps.1, - ) - .execute(&mut transaction) - .await?; + while let Some((fqdn, snap_map)) = s.try_next().await? { + for (fs_name, snapshots) in snap_map { + tracing::debug!("snapshots from {}: {:?}", fqdn, snapshots); + + let snaps = snapshots.into_iter().fold( + (vec![], vec![], vec![], vec![], vec![], vec![], vec![]), + |mut acc, s| { + acc.0.push(s.filesystem_name); + acc.1.push(s.snapshot_name); + acc.2.push(s.create_time.naive_utc()); + acc.3.push(s.modify_time.naive_utc()); + acc.4.push(s.snapshot_fsname.clone()); + acc.5.push(s.mounted); + acc.6.push(s.comment); + + acc + }, + ); + + let mut transaction = pool.begin().await?; + + sqlx::query!( + r#" + DELETE FROM snapshot + WHERE snapshot_name NOT IN (SELECT * FROM UNNEST ($1::text[])) + AND filesystem_name=$2::text + "#, + &snaps.1, + &fs_name, + ) + .execute(&mut transaction) + .await?; - sqlx::query!( + sqlx::query!( r#" INSERT INTO snapshot (filesystem_name, snapshot_name, create_time, modify_time, snapshot_fsname, mounted, comment) SELECT * FROM @@ -117,13 +120,14 @@ async fn main() -> Result<(), Box> { &snaps.2, &snaps.3, &snaps.4, - &snaps.5 as &[Option], + &snaps.5, &snaps.6 as &[Option], ) .execute(&mut transaction) .await?; - transaction.commit().await?; + transaction.commit().await?; + } } Ok(()) diff --git a/iml-wire-types/src/snapshot.rs b/iml-wire-types/src/snapshot.rs index f9ee72d700..3bf2344f3c 100644 --- a/iml-wire-types/src/snapshot.rs +++ b/iml-wire-types/src/snapshot.rs @@ -33,7 +33,7 @@ pub struct Snapshot { pub snapshot_fsname: String, pub modify_time: DateTime, pub create_time: DateTime, - pub mounted: Option, + pub mounted: bool, /// Optional comment for the snapshot pub comment: Option, } @@ -46,7 +46,7 @@ pub struct SnapshotRecord { pub modify_time: DateTime, pub create_time: DateTime, pub snapshot_fsname: String, - pub mounted: Option, + pub mounted: bool, pub comment: Option, } diff --git a/migrations/20201106182518_snapshot_mounted.sql b/migrations/20201106182518_snapshot_mounted.sql new file mode 100644 index 0000000000..4010c628de --- /dev/null +++ b/migrations/20201106182518_snapshot_mounted.sql @@ -0,0 +1,3 @@ +UPDATE snapshot SET mounted = false WHERE mounted is NULL; + +ALTER TABLE IF EXISTS snapshot ALTER COLUMN mounted SET NOT NULL; diff --git a/sqlx-data.json b/sqlx-data.json index e40b57e953..1c2d809346 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -13,6 +13,19 @@ "nullable": [] } }, + "00ade0264fae96a50e1d496a0affee5d9b3f29a433191e08379205b331685da2": { + "query": "\n DELETE FROM snapshot\n WHERE snapshot_name NOT IN (SELECT * FROM UNNEST ($1::text[]))\n AND filesystem_name=$2::text\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "TextArray", + "Text" + ] + }, + "nullable": [] + } + }, "044c83becc9a4280aa888bab7106a2fb5501c1a205830e57010416e1aaeae1d3": { "query": "\n SELECT\n (n.id).name AS \"name!\",\n (n.id).id AS \"id!\",\n cluster_id,\n online,\n standby,\n standby_onfail,\n maintenance,\n pending,\n unclean,\n shutdown,\n expected_up,\n is_dc,\n resources_running,\n type\n FROM corosync_node n\n ORDER BY\n CASE WHEN $1 = 'ASC' THEN n.id END ASC,\n CASE WHEN $1 = 'DESC' THEN n.id END DESC\n OFFSET $2 LIMIT $3", "describe": { @@ -725,7 +738,7 @@ false, false, false, - true, + false, true ] } @@ -1976,19 +1989,6 @@ "nullable": [] } }, - "628b472cfdf32ba54540531b117c31b3787c03b05ddf57e03cdf840c71155bde": { - "query": "\n DELETE FROM snapshot\n WHERE (filesystem_name, snapshot_name) NOT IN (SELECT * FROM UNNEST ($1::text[], $2::text[]))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "TextArray", - "TextArray" - ] - }, - "nullable": [] - } - }, "681d997bb965a228c0aa75d93d09faefe5412daaf3e7bda3630df319fb9edabb": { "query": "select * from django_content_type", "describe": { @@ -2947,7 +2947,7 @@ false, false, false, - true, + false, true ] } @@ -3714,7 +3714,7 @@ false, false, false, - true, + false, true ] }