Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Monitor client mounts in iml-device #1911

Merged
merged 1 commit into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 0 additions & 40 deletions chroma_core/services/lustre_audit/update_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def audit_host(self):
self.update_resource_locations()

self.update_target_mounts()
self.update_client_mounts()

def run(self, host_id, host_data):
host = ManagedHost.objects.get(pk=host_id)
Expand Down Expand Up @@ -125,45 +124,6 @@ def _updates_available(installed_versions, available_versions):
log.info("update_packages(%s): updates=%s" % (self.host, updates))
job_scheduler_notify.notify(self.host, self.started_at, {"needs_update": updates})

def update_client_mounts(self):
# Client mount audit comes in via metrics due to the way the
# ClientAudit is implemented.
try:
client_mounts = self.host_data["metrics"]["raw"]["lustre_client_mounts"]
except KeyError:
client_mounts = []

# If lustre_client_mounts is None then nothing changed since the last update and so we can just return.
# Not the same as [] empty list which means no mounts
if client_mounts is None:
return

expected_fs_mounts = LustreClientMount.objects.filter(host=self.host)
actual_fs_mounts = [m["mountspec"].split(":/")[1] for m in client_mounts]

# Don't bother with the rest if there's nothing to do.
if len(expected_fs_mounts) == 0 and len(actual_fs_mounts) == 0:
return

for expected_mount in expected_fs_mounts:
if expected_mount.active and expected_mount.filesystem not in actual_fs_mounts:
update = dict(state="unmounted", mountpoint=None)
job_scheduler_notify.notify(expected_mount, self.started_at, update)
log.info("updated mount %s on %s -> inactive" % (expected_mount.mountpoint, self.host))

for actual_mount in client_mounts:
fsname = actual_mount["mountspec"].split(":/")[1]
try:
mount = [m for m in expected_fs_mounts if m.filesystem == fsname][0]
log.debug("mount: %s" % mount)
if not mount.active:
update = dict(state="mounted", mountpoint=actual_mount["mountpoint"])
job_scheduler_notify.notify(mount, self.started_at, update)
log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host))
except IndexError:
log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host))
JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"])

def update_target_mounts(self):
# If mounts is None then nothing changed since the last update and so we can just return.
# Not the same as [] empty list which means no mounts
Expand Down
2 changes: 1 addition & 1 deletion device-scanner/device-scanner-daemon/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ pub fn produce_device_graph(state: &state::State) -> Result<bytes::Bytes> {

build_device_graph(&mut root, &dev_list, &state.local_mounts)?;

let v = serde_json::to_string(&root)?;
let v = serde_json::to_string(&(&root, &state.local_mounts))?;
let b = bytes::BytesMut::from(v + "\n");
Ok(b.freeze())
}
2 changes: 2 additions & 0 deletions iml-services/iml-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ authors = ["IML Team <iml@whamcloud.com>"]
edition = "2018"

[dependencies]
chrono = "0.4"
device-types = "0.1"
diesel = { version = "1.4", default_features = false, features = ["postgres", "r2d2", "chrono", "serde_json"] }
futures = "0.3"
im = { version = "13.0", features = ["serde"] }
iml-manager-env = { path = "../../iml-manager-env", version = "0.3" }
iml-orm = { path = "../../iml-orm", version = "0.3", features = ["postgres-interop"] }
iml-postgres = { path = "../../iml-postgres", version = "0.3" }
Expand Down
159 changes: 133 additions & 26 deletions iml-services/iml-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use device_types::devices::Device;
use diesel::{self, pg::upsert::excluded, prelude::*};
use device_types::{devices::Device, mount::Mount};
use diesel::{self, dsl, pg::upsert::excluded, prelude::*, sql_types};
use futures::{lock::Mutex, TryFutureExt, TryStreamExt};
use im::HashSet;
use iml_device::{
linux_plugin_transforms::{
build_device_lookup, devtree2linuxoutput, get_shared_pools, populate_zpool, update_vgs,
Expand All @@ -13,9 +14,12 @@ use iml_device::{
ImlDeviceError,
};
use iml_orm::{
models::{ChromaCoreDevice, NewChromaCoreDevice},
schema::chroma_core_device::{devices, fqdn, table},
tokio_diesel::*,
models::{ChromaCoreDevice, ChromaCoreLustreclientmount, NewChromaCoreDevice},
schema::{
chroma_core_device as dvc, chroma_core_lustreclientmount as clmnt,
chroma_core_managedhost as host, django_content_type as djct,
},
tokio_diesel::{AsyncRunQueryDsl as _, OptionalExtension as _},
DbPool,
};
use iml_service_queue::service_queue::consume_data;
Expand All @@ -29,7 +33,7 @@ use warp::Filter;
type Cache = Arc<Mutex<HashMap<Fqdn, Device>>>;

async fn create_cache(pool: &DbPool) -> Result<Cache, ImlDeviceError> {
let data: HashMap<Fqdn, Device> = table
let data: HashMap<Fqdn, Device> = dvc::table
.load_async(&pool)
.await?
.into_iter()
Expand All @@ -45,6 +49,116 @@ async fn create_cache(pool: &DbPool) -> Result<Cache, ImlDeviceError> {
Ok(Arc::new(Mutex::new(data)))
}

async fn update_devices(
pool: &DbPool,
host: &Fqdn,
devices: &Device,
) -> Result<(), ImlDeviceError> {
let device_to_insert = NewChromaCoreDevice {
fqdn: host.to_string(),
devices: serde_json::to_value(devices)
.expect("Could not convert incoming Devices to JSON."),
};

let new_device = diesel::insert_into(dvc::table)
.values(device_to_insert)
.on_conflict(dvc::fqdn)
.do_update()
.set(dvc::devices.eq(excluded(dvc::devices)))
.get_result_async::<ChromaCoreDevice>(&pool)
.await?;
tracing::info!("Inserted devices from host '{}'", host);
tracing::debug!("Inserted {:?}", new_device);
Ok(())
}

async fn update_client_mounts(
pool: &DbPool,
ct_id: Option<i32>,
host: &Fqdn,
mounts: &HashSet<Mount>,
) -> Result<(), ImlDeviceError> {
let host_id = match host::table
.select(host::id)
.filter(host::fqdn.eq(host.to_string()))
.get_result_async::<i32>(&pool)
.await
.optional()?
{
Some(id) => id,
None => {
tracing::warn!("Host '{}' is unknown", host);
return Ok(());
}
};

let lustre_mounts = mounts
.into_iter()
.filter(|m| m.fs_type.0 == "lustre")
.filter_map(|m| {
m.source
.0
.to_str()
.and_then(|p| p.splitn(2, ":/").nth(1))
.map(|fs| (fs.to_string(), m.target.0.to_str().map(String::from)))
})
.collect::<Vec<_>>();

tracing::debug!(
"Client mounts at {}({}): {:?}",
host,
host_id,
&lustre_mounts
);

let state_modified_at = chrono::offset::Utc::now().into_sql::<sql_types::Timestamptz>();

let xs: Vec<_> = lustre_mounts
.into_iter()
.map(|(fs, tg)| {
(
clmnt::host_id.eq(host_id),
clmnt::filesystem.eq(fs),
clmnt::mountpoint.eq(tg),
clmnt::state.eq("mounted"),
clmnt::state_modified_at.eq(state_modified_at),
clmnt::immutable_state.eq(false),
clmnt::not_deleted.eq(true),
clmnt::content_type_id.eq(ct_id),
)
})
.collect();

let existing_mounts = diesel::insert_into(clmnt::table)
.values(xs)
.on_conflict((clmnt::host_id, clmnt::filesystem, clmnt::not_deleted))
.do_update()
.set((
clmnt::mountpoint.eq(excluded(clmnt::mountpoint)),
clmnt::state.eq(excluded(clmnt::state)),
clmnt::state_modified_at.eq(state_modified_at),
))
.get_results_async::<ChromaCoreLustreclientmount>(&pool)
.await?;

let fs_names: Vec<String> = existing_mounts.into_iter().map(|c| c.filesystem).collect();

let updated = diesel::update(clmnt::table)
.filter(clmnt::filesystem.ne(dsl::all(fs_names)))
.filter(clmnt::host_id.eq(host_id))
.set((
clmnt::mountpoint.eq(Option::<String>::None),
clmnt::state.eq("unmounted".as_sql::<sql_types::Text>()),
clmnt::state_modified_at.eq(state_modified_at),
))
.get_results_async::<ChromaCoreLustreclientmount>(&pool)
.await?;

tracing::debug!("Updated client mounts: {:?}", updated);

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), ImlDeviceError> {
iml_tracing::init();
Expand Down Expand Up @@ -112,29 +226,22 @@ async fn main() -> Result<(), ImlDeviceError> {

let ch = iml_rabbit::create_channel(&conn).await?;

let mut s = consume_data::<Device>(&ch, "rust_agent_device_rx");
let mut s = consume_data::<(Device, HashSet<Mount>)>(&ch, "rust_agent_device_rx");

while let Some((f, d)) = s.try_next().await? {
let mut cache = cache2.lock().await;

cache.insert(f.clone(), d.clone());
// Django's artifact:
let lustreclientmount_ct_id = djct::table
.select(djct::id)
.filter(djct::model.eq("lustreclientmount"))
.first_async::<i32>(&pool)
.await
.optional()?;

let device_to_insert = NewChromaCoreDevice {
fqdn: f.to_string(),
devices: serde_json::to_value(d).expect("Could not convert incoming Devices to JSON."),
};

let new_device = diesel::insert_into(table)
.values(device_to_insert)
.on_conflict(fqdn)
.do_update()
.set(devices.eq(excluded(devices)))
.get_result_async::<ChromaCoreDevice>(&pool)
.await
.expect("Error saving new device");
while let Some((host, (devices, mounts))) = s.try_next().await? {
let mut cache = cache2.lock().await;
cache.insert(host.clone(), devices.clone());

tracing::info!("Inserted device from host {}", new_device.fqdn);
tracing::trace!("Inserted device {:?}", new_device);
update_devices(&pool, &host, &devices).await?;
update_client_mounts(&pool, lustreclientmount_ct_id, &host, &mounts).await?;
}

Ok(())
Expand Down