From 9ca30fea2a4cd3ca6ad1812c9d1b6dbe2c338eff Mon Sep 17 00:00:00 2001 From: Igor Pashev Date: Mon, 18 May 2020 14:17:25 +0200 Subject: [PATCH] Monitor client mounts in iml-device Signed-off-by: Igor Pashev --- Cargo.lock | 2 + .../services/lustre_audit/update_scan.py | 40 ----- .../device-scanner-daemon/src/state.rs | 2 +- iml-services/iml-device/Cargo.toml | 2 + iml-services/iml-device/src/main.rs | 159 +++++++++++++++--- 5 files changed, 138 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0b36c6b15..fac4159a70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,9 +1349,11 @@ dependencies = [ name = "iml-device" version = "0.3.0" dependencies = [ + "chrono", "device-types", "diesel", "futures", + "im 13.0.0", "iml-manager-env", "iml-orm", "iml-postgres", diff --git a/chroma_core/services/lustre_audit/update_scan.py b/chroma_core/services/lustre_audit/update_scan.py index a0f375f101..93e53a4e77 100755 --- a/chroma_core/services/lustre_audit/update_scan.py +++ b/chroma_core/services/lustre_audit/update_scan.py @@ -47,7 +47,6 @@ def audit_host(self): self.update_resource_locations() self.update_target_mounts() - self.update_client_mounts() def run(self, host_id, host_data): host = ManagedHost.objects.get(pk=host_id) @@ -125,45 +124,6 @@ def _updates_available(installed_versions, available_versions): log.info("update_packages(%s): updates=%s" % (self.host, updates)) job_scheduler_notify.notify(self.host, self.started_at, {"needs_update": updates}) - def update_client_mounts(self): - # Client mount audit comes in via metrics due to the way the - # ClientAudit is implemented. - try: - client_mounts = self.host_data["metrics"]["raw"]["lustre_client_mounts"] - except KeyError: - client_mounts = [] - - # If lustre_client_mounts is None then nothing changed since the last update and so we can just return. - # Not the same as [] empty list which means no mounts - if client_mounts is None: - return - - expected_fs_mounts = LustreClientMount.objects.filter(host=self.host) - actual_fs_mounts = [m["mountspec"].split(":/")[1] for m in client_mounts] - - # Don't bother with the rest if there's nothing to do. - if len(expected_fs_mounts) == 0 and len(actual_fs_mounts) == 0: - return - - for expected_mount in expected_fs_mounts: - if expected_mount.active and expected_mount.filesystem not in actual_fs_mounts: - update = dict(state="unmounted", mountpoint=None) - job_scheduler_notify.notify(expected_mount, self.started_at, update) - log.info("updated mount %s on %s -> inactive" % (expected_mount.mountpoint, self.host)) - - for actual_mount in client_mounts: - fsname = actual_mount["mountspec"].split(":/")[1] - try: - mount = [m for m in expected_fs_mounts if m.filesystem == fsname][0] - log.debug("mount: %s" % mount) - if not mount.active: - update = dict(state="mounted", mountpoint=actual_mount["mountpoint"]) - job_scheduler_notify.notify(mount, self.started_at, update) - log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host)) - except IndexError: - log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host)) - JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"]) - def update_target_mounts(self): # If mounts is None then nothing changed since the last update and so we can just return. # Not the same as [] empty list which means no mounts diff --git a/device-scanner/device-scanner-daemon/src/state.rs b/device-scanner/device-scanner-daemon/src/state.rs index d74ec00668..bd70c9fdd7 100644 --- a/device-scanner/device-scanner-daemon/src/state.rs +++ b/device-scanner/device-scanner-daemon/src/state.rs @@ -486,7 +486,7 @@ pub fn produce_device_graph(state: &state::State) -> Result { build_device_graph(&mut root, &dev_list, &state.local_mounts)?; - let v = serde_json::to_string(&root)?; + let v = serde_json::to_string(&(&root, &state.local_mounts))?; let b = bytes::BytesMut::from(v + "\n"); Ok(b.freeze()) } diff --git a/iml-services/iml-device/Cargo.toml b/iml-services/iml-device/Cargo.toml index 37d98b0dea..9a82bb49e6 100644 --- a/iml-services/iml-device/Cargo.toml +++ b/iml-services/iml-device/Cargo.toml @@ -5,9 +5,11 @@ authors = ["IML Team "] edition = "2018" [dependencies] +chrono = "0.4" device-types = "0.1" diesel = { version = "1.4", default_features = false, features = ["postgres", "r2d2", "chrono", "serde_json"] } futures = "0.3" +im = { version = "13.0", features = ["serde"] } iml-manager-env = { path = "../../iml-manager-env", version = "0.3" } iml-orm = { path = "../../iml-orm", version = "0.3", features = ["postgres-interop"] } iml-postgres = { path = "../../iml-postgres", version = "0.3" } diff --git a/iml-services/iml-device/src/main.rs b/iml-services/iml-device/src/main.rs index c1e784b94b..22a85e8bc7 100644 --- a/iml-services/iml-device/src/main.rs +++ b/iml-services/iml-device/src/main.rs @@ -2,9 +2,10 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -use device_types::devices::Device; -use diesel::{self, pg::upsert::excluded, prelude::*}; +use device_types::{devices::Device, mount::Mount}; +use diesel::{self, dsl, pg::upsert::excluded, prelude::*, sql_types}; use futures::{lock::Mutex, TryFutureExt, TryStreamExt}; +use im::HashSet; use iml_device::{ linux_plugin_transforms::{ build_device_lookup, devtree2linuxoutput, get_shared_pools, populate_zpool, update_vgs, @@ -13,9 +14,12 @@ use iml_device::{ ImlDeviceError, }; use iml_orm::{ - models::{ChromaCoreDevice, NewChromaCoreDevice}, - schema::chroma_core_device::{devices, fqdn, table}, - tokio_diesel::*, + models::{ChromaCoreDevice, ChromaCoreLustreclientmount, NewChromaCoreDevice}, + schema::{ + chroma_core_device as dvc, chroma_core_lustreclientmount as clmnt, + chroma_core_managedhost as host, django_content_type as djct, + }, + tokio_diesel::{AsyncRunQueryDsl as _, OptionalExtension as _}, DbPool, }; use iml_service_queue::service_queue::consume_data; @@ -29,7 +33,7 @@ use warp::Filter; type Cache = Arc>>; async fn create_cache(pool: &DbPool) -> Result { - let data: HashMap = table + let data: HashMap = dvc::table .load_async(&pool) .await? .into_iter() @@ -45,6 +49,116 @@ async fn create_cache(pool: &DbPool) -> Result { Ok(Arc::new(Mutex::new(data))) } +async fn update_devices( + pool: &DbPool, + host: &Fqdn, + devices: &Device, +) -> Result<(), ImlDeviceError> { + let device_to_insert = NewChromaCoreDevice { + fqdn: host.to_string(), + devices: serde_json::to_value(devices) + .expect("Could not convert incoming Devices to JSON."), + }; + + let new_device = diesel::insert_into(dvc::table) + .values(device_to_insert) + .on_conflict(dvc::fqdn) + .do_update() + .set(dvc::devices.eq(excluded(dvc::devices))) + .get_result_async::(&pool) + .await?; + tracing::info!("Inserted devices from host '{}'", host); + tracing::debug!("Inserted {:?}", new_device); + Ok(()) +} + +async fn update_client_mounts( + pool: &DbPool, + ct_id: Option, + host: &Fqdn, + mounts: &HashSet, +) -> Result<(), ImlDeviceError> { + let host_id = match host::table + .select(host::id) + .filter(host::fqdn.eq(host.to_string())) + .get_result_async::(&pool) + .await + .optional()? + { + Some(id) => id, + None => { + tracing::warn!("Host '{}' is unknown", host); + return Ok(()); + } + }; + + let lustre_mounts = mounts + .into_iter() + .filter(|m| m.fs_type.0 == "lustre") + .filter_map(|m| { + m.source + .0 + .to_str() + .and_then(|p| p.splitn(2, ":/").nth(1)) + .map(|fs| (fs.to_string(), m.target.0.to_str().map(String::from))) + }) + .collect::>(); + + tracing::debug!( + "Client mounts at {}({}): {:?}", + host, + host_id, + &lustre_mounts + ); + + let state_modified_at = chrono::offset::Utc::now().into_sql::(); + + let xs: Vec<_> = lustre_mounts + .into_iter() + .map(|(fs, tg)| { + ( + clmnt::host_id.eq(host_id), + clmnt::filesystem.eq(fs), + clmnt::mountpoint.eq(tg), + clmnt::state.eq("mounted"), + clmnt::state_modified_at.eq(state_modified_at), + clmnt::immutable_state.eq(false), + clmnt::not_deleted.eq(true), + clmnt::content_type_id.eq(ct_id), + ) + }) + .collect(); + + let existing_mounts = diesel::insert_into(clmnt::table) + .values(xs) + .on_conflict((clmnt::host_id, clmnt::filesystem, clmnt::not_deleted)) + .do_update() + .set(( + clmnt::mountpoint.eq(excluded(clmnt::mountpoint)), + clmnt::state.eq(excluded(clmnt::state)), + clmnt::state_modified_at.eq(state_modified_at), + )) + .get_results_async::(&pool) + .await?; + + let fs_names: Vec = existing_mounts.into_iter().map(|c| c.filesystem).collect(); + + let updated = diesel::update(clmnt::table) + .filter(clmnt::filesystem.ne(dsl::all(fs_names))) + .filter(clmnt::host_id.eq(host_id)) + .set(( + clmnt::mountpoint.eq(Option::::None), + clmnt::state.eq("unmounted".as_sql::()), + clmnt::state_modified_at.eq(state_modified_at), + )) + .get_results_async::(&pool) + .await?; + + tracing::debug!("Updated client mounts: {:?}", updated); + + Ok(()) +} + #[tokio::main] async fn main() -> Result<(), ImlDeviceError> { iml_tracing::init(); @@ -112,29 +226,22 @@ async fn main() -> Result<(), ImlDeviceError> { let ch = iml_rabbit::create_channel(&conn).await?; - let mut s = consume_data::(&ch, "rust_agent_device_rx"); + let mut s = consume_data::<(Device, HashSet)>(&ch, "rust_agent_device_rx"); - while let Some((f, d)) = s.try_next().await? { - let mut cache = cache2.lock().await; - - cache.insert(f.clone(), d.clone()); + // Django's artifact: + let lustreclientmount_ct_id = djct::table + .select(djct::id) + .filter(djct::model.eq("lustreclientmount")) + .first_async::(&pool) + .await + .optional()?; - let device_to_insert = NewChromaCoreDevice { - fqdn: f.to_string(), - devices: serde_json::to_value(d).expect("Could not convert incoming Devices to JSON."), - }; - - let new_device = diesel::insert_into(table) - .values(device_to_insert) - .on_conflict(fqdn) - .do_update() - .set(devices.eq(excluded(devices))) - .get_result_async::(&pool) - .await - .expect("Error saving new device"); + while let Some((host, (devices, mounts))) = s.try_next().await? { + let mut cache = cache2.lock().await; + cache.insert(host.clone(), devices.clone()); - tracing::info!("Inserted device from host {}", new_device.fqdn); - tracing::trace!("Inserted device {:?}", new_device); + update_devices(&pool, &host, &devices).await?; + update_client_mounts(&pool, lustreclientmount_ct_id, &host, &mounts).await?; } Ok(())