Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Monitor client mounts in iml-device
Browse files Browse the repository at this point in the history
Note: this requires the change in device-scanner-daemon to send local
mounts along with the devices (in a tuple)

Signed-off-by: Igor Pashev <pashev.igor@gmail.com>
  • Loading branch information
ip1981 committed Jun 4, 2020
1 parent f0e27cc commit 725934e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 66 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 0 additions & 40 deletions chroma_core/services/lustre_audit/update_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def audit_host(self):
self.update_resource_locations()

self.update_target_mounts()
self.update_client_mounts()

def run(self, host_id, host_data):
host = ManagedHost.objects.get(pk=host_id)
Expand Down Expand Up @@ -125,45 +124,6 @@ def _updates_available(installed_versions, available_versions):
log.info("update_packages(%s): updates=%s" % (self.host, updates))
job_scheduler_notify.notify(self.host, self.started_at, {"needs_update": updates})

def update_client_mounts(self):
# Client mount audit comes in via metrics due to the way the
# ClientAudit is implemented.
try:
client_mounts = self.host_data["metrics"]["raw"]["lustre_client_mounts"]
except KeyError:
client_mounts = []

# If lustre_client_mounts is None then nothing changed since the last update and so we can just return.
# Not the same as [] empty list which means no mounts
if client_mounts is None:
return

expected_fs_mounts = LustreClientMount.objects.filter(host=self.host)
actual_fs_mounts = [m["mountspec"].split(":/")[1] for m in client_mounts]

# Don't bother with the rest if there's nothing to do.
if len(expected_fs_mounts) == 0 and len(actual_fs_mounts) == 0:
return

for expected_mount in expected_fs_mounts:
if expected_mount.active and expected_mount.filesystem not in actual_fs_mounts:
update = dict(state="unmounted", mountpoint=None)
job_scheduler_notify.notify(expected_mount, self.started_at, update)
log.info("updated mount %s on %s -> inactive" % (expected_mount.mountpoint, self.host))

for actual_mount in client_mounts:
fsname = actual_mount["mountspec"].split(":/")[1]
try:
mount = [m for m in expected_fs_mounts if m.filesystem == fsname][0]
log.debug("mount: %s" % mount)
if not mount.active:
update = dict(state="mounted", mountpoint=actual_mount["mountpoint"])
job_scheduler_notify.notify(mount, self.started_at, update)
log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host))
except IndexError:
log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host))
JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"])

def update_target_mounts(self):
# If mounts is None then nothing changed since the last update and so we can just return.
# Not the same as [] empty list which means no mounts
Expand Down
2 changes: 2 additions & 0 deletions iml-services/iml-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ authors = ["IML Team <iml@whamcloud.com>"]
edition = "2018"

[dependencies]
chrono = "0.4"
device-types = "0.1"
diesel = { version = "1.4", default_features = false, features = ["postgres", "r2d2", "chrono", "serde_json"] }
futures = "0.3"
im = { version = "13.0", features = ["serde"] }
iml-manager-env = { path = "../../iml-manager-env", version = "0.3" }
iml-orm = { path = "../../iml-orm", version = "0.3", features = ["postgres-interop"] }
iml-postgres = { path = "../../iml-postgres", version = "0.3" }
Expand Down
159 changes: 133 additions & 26 deletions iml-services/iml-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use device_types::devices::Device;
use diesel::{self, pg::upsert::excluded, prelude::*};
use device_types::{devices::Device, mount::Mount};
use diesel::{self, dsl, pg::upsert::excluded, prelude::*, sql_types};
use futures::{lock::Mutex, TryFutureExt, TryStreamExt};
use im::HashSet;
use iml_device::{
linux_plugin_transforms::{
build_device_lookup, devtree2linuxoutput, get_shared_pools, populate_zpool, update_vgs,
Expand All @@ -13,9 +14,12 @@ use iml_device::{
ImlDeviceError,
};
use iml_orm::{
models::{ChromaCoreDevice, NewChromaCoreDevice},
schema::chroma_core_device::{devices, fqdn, table},
tokio_diesel::*,
models::{ChromaCoreDevice, ChromaCoreLustreclientmount, NewChromaCoreDevice},
schema::{
chroma_core_device as dvc, chroma_core_lustreclientmount as clmnt,
chroma_core_managedhost as host, django_content_type as djct,
},
tokio_diesel::{AsyncRunQueryDsl as _, OptionalExtension as _},
DbPool,
};
use iml_service_queue::service_queue::consume_data;
Expand All @@ -29,7 +33,7 @@ use warp::Filter;
type Cache = Arc<Mutex<HashMap<Fqdn, Device>>>;

async fn create_cache(pool: &DbPool) -> Result<Cache, ImlDeviceError> {
let data: HashMap<Fqdn, Device> = table
let data: HashMap<Fqdn, Device> = dvc::table
.load_async(&pool)
.await?
.into_iter()
Expand All @@ -45,6 +49,116 @@ async fn create_cache(pool: &DbPool) -> Result<Cache, ImlDeviceError> {
Ok(Arc::new(Mutex::new(data)))
}

async fn update_devices(
pool: &DbPool,
host: &Fqdn,
devices: &Device,
) -> Result<(), ImlDeviceError> {
let device_to_insert = NewChromaCoreDevice {
fqdn: host.to_string(),
devices: serde_json::to_value(devices)
.expect("Could not convert incoming Devices to JSON."),
};

let new_device = diesel::insert_into(dvc::table)
.values(device_to_insert)
.on_conflict(dvc::fqdn)
.do_update()
.set(dvc::devices.eq(excluded(dvc::devices)))
.get_result_async::<ChromaCoreDevice>(&pool)
.await?;
tracing::info!("Inserted devices from host '{}'", host);
tracing::debug!("Inserted {:?}", new_device);
Ok(())
}

async fn update_client_mounts(
pool: &DbPool,
ct_id: Option<i32>,
host: &Fqdn,
mounts: &HashSet<Mount>,
) -> Result<(), ImlDeviceError> {
let host_id = match host::table
.select(host::id)
.filter(host::fqdn.eq(host.to_string()))
.get_result_async::<i32>(&pool)
.await
.optional()?
{
Some(id) => id,
None => {
tracing::warn!("Host '{}' is unknown", host);
return Ok(());
}
};

let lustre_mounts = mounts
.into_iter()
.filter(|m| m.fs_type.0 == "lustre")
.filter_map(|m| {
m.source
.0
.to_str()
.and_then(|p| p.splitn(2, ":/").nth(1))
.map(|fs| (fs.to_string(), m.target.0.to_str().map(String::from)))
})
.collect::<Vec<_>>();

tracing::debug!(
"Client mounts at {}({}): {:?}",
host,
host_id,
&lustre_mounts
);

let state_modified_at = chrono::offset::Utc::now().into_sql::<sql_types::Timestamptz>();

let xs: Vec<_> = lustre_mounts
.into_iter()
.map(|(fs, tg)| {
(
clmnt::host_id.eq(host_id),
clmnt::filesystem.eq(fs),
clmnt::mountpoint.eq(tg),
clmnt::state.eq("mounted"),
clmnt::state_modified_at.eq(state_modified_at),
clmnt::immutable_state.eq(false),
clmnt::not_deleted.eq(true),
clmnt::content_type_id.eq(ct_id),
)
})
.collect();

let existing_mounts = diesel::insert_into(clmnt::table)
.values(xs)
.on_conflict((clmnt::host_id, clmnt::filesystem, clmnt::not_deleted))
.do_update()
.set((
clmnt::mountpoint.eq(excluded(clmnt::mountpoint)),
clmnt::state.eq(excluded(clmnt::state)),
clmnt::state_modified_at.eq(state_modified_at),
))
.get_results_async::<ChromaCoreLustreclientmount>(&pool)
.await?;

let fs_names: Vec<String> = existing_mounts.into_iter().map(|c| c.filesystem).collect();

let updated = diesel::update(clmnt::table)
.filter(clmnt::filesystem.ne(dsl::all(fs_names)))
.filter(clmnt::host_id.eq(host_id))
.set((
clmnt::mountpoint.eq(Option::<String>::None),
clmnt::state.eq("unmounted".as_sql::<sql_types::Text>()),
clmnt::state_modified_at.eq(state_modified_at),
))
.get_results_async::<ChromaCoreLustreclientmount>(&pool)
.await?;

tracing::debug!("Updated client mounts: {:?}", updated);

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), ImlDeviceError> {
iml_tracing::init();
Expand Down Expand Up @@ -106,29 +220,22 @@ async fn main() -> Result<(), ImlDeviceError> {

tokio::spawn(server);

let mut s = consume_data::<Device>("rust_agent_device_rx");
let mut s = consume_data::<(Device, HashSet<Mount>)>("rust_agent_device_rx");

while let Some((f, d)) = s.try_next().await? {
let mut cache = cache2.lock().await;

cache.insert(f.clone(), d.clone());
// Django's artifact:
let lustreclientmount_ct_id = djct::table
.select(djct::id)
.filter(djct::model.eq("lustreclientmount"))
.first_async::<i32>(&pool)
.await
.optional()?;

let device_to_insert = NewChromaCoreDevice {
fqdn: f.to_string(),
devices: serde_json::to_value(d).expect("Could not convert incoming Devices to JSON."),
};

let new_device = diesel::insert_into(table)
.values(device_to_insert)
.on_conflict(fqdn)
.do_update()
.set(devices.eq(excluded(devices)))
.get_result_async::<ChromaCoreDevice>(&pool)
.await
.expect("Error saving new device");
while let Some((host, (devices, mounts))) = s.try_next().await? {
let mut cache = cache2.lock().await;
cache.insert(host.clone(), devices.clone());

tracing::info!("Inserted device from host {}", new_device.fqdn);
tracing::trace!("Inserted device {:?}", new_device);
update_devices(&pool, &host, &devices).await?;
update_client_mounts(&pool, lustreclientmount_ct_id, &host, &mounts).await?;
}

Ok(())
Expand Down

0 comments on commit 725934e

Please sign in to comment.