Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Fix Stat deletion (#2263)
Browse files Browse the repository at this point in the history
* Implement influx client extension

Implement an influx client extenstion that allows for direct
serialization to a struct instead of needing to traverse the result and
build it by hand. Ex:

```rust
struct MgsFsTime {
    time: u64,
}

let xs: Vec<MgsFsTime> = client
        .query_into(
            format!(
                r#"
            SELECT mgs_fs,is_mgs_fs
            FROM target
            WHERE mgs_fs='{}' AND is_mgs_fs=true
            ORDER BY time ASC
            LIMIT 2"#,
                fs_names
            )
            .as_str(),
            Some(Precision::Nanoseconds),
        )
        .await?
        .unwrap_or_default();
```

Signed-off-by: Joe Grund <jgrund@whamcloud.io>

* Fix Stat deletion

The current targets table is intermittently losing it's filesystems for
some targets.

This appears to be because we delete stat info before inserting the next
batch. If anyone queries during this gap, they will get empty data even
though we have the points, they just haven't been inserted yet.

In addition, we are clearing out all stats except a single point for any target colocated on the MGS node.

This patch waits until after insertion to do a deletion, and scopes the
delete so it should only effect the correct stat.

Fixes #2258.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund authored Sep 23, 2020
1 parent e1ee6d1 commit de268f6
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 216 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions iml-services/iml-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ device-types = {path = "../../device-scanner/device-types", version = "0.2"}
futures = "0.3"
im = {version = "15.0", features = ["serde"]}
iml-change = {path = "../../iml-change", version = "0.1"}
iml-influx = {path = "../../iml-influx", version = "0.1"}
iml-influx = {path = "../../iml-influx", version = "0.1", features = ["with-db-client"]}
iml-manager-env = {path = "../../iml-manager-env", version = "0.3"}
iml-postgres = {path = "../../iml-postgres", version = "0.3"}
iml-rabbit = {path = "../../iml-rabbit", version = "0.3"}
iml-service-queue = {path = "../iml-service-queue", version = "0.3"}
iml-tracing = {version = "0.2", path = "../../iml-tracing"}
iml-wire-types = {path = "../../iml-wire-types", version = "0.3", features = ["postgres-interop"]}
influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"]}
serde = {version = "1", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion iml-services/iml-device/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum ImlDeviceError {
#[error(transparent)]
SqlxMigrateError(#[from] sqlx::migrate::MigrateError),
#[error(transparent)]
InfluxDbError(#[from] influx_db_client::error::Error),
ImlInfluxError(#[from] iml_influx::Error),
}

impl reject::Reject for ImlDeviceError {}
231 changes: 85 additions & 146 deletions iml-services/iml-device/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@
pub mod error;
pub mod linux_plugin_transforms;

pub use error::ImlDeviceError;

use device_types::{
devices::{Device, DeviceId},
mount::Mount,
};
pub use error::ImlDeviceError;
use futures::{future::try_join_all, lock::Mutex};
use im::HashSet;
use iml_change::*;
use iml_influx::ColVals;
use iml_influx::{Client, InfluxClientExt as _, Precision};
use iml_postgres::sqlx::{self, PgPool};
use iml_tracing::tracing;
use iml_wire_types::Fqdn;
use influx_db_client::{keys::Node, Client, Precision};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::Arc,
Expand Down Expand Up @@ -501,53 +499,45 @@ pub fn build_updates(x: Changes<'_, Target>) -> Vec<Target> {
}
}

fn parse_filesystem_data(query_result: Option<Vec<Node>>) -> TargetFsRecord {
let target_to_fs = if let Some(nodes) = query_result {
let items = nodes
.into_iter()
.filter_map(|x| x.series)
.flatten()
.map(|x| -> serde_json::Value { ColVals(x.columns, x.values).into() })
.map(|x| {
let fs_record: Vec<FsRecord> =
serde_json::from_value(x).expect("Couldn't convert to record.");
fs_record
})
.flatten()
.map(|x| {
let filesystems: String = x.filesystems();
let host: String = x.host;
let target: String = x.target;
fn parse_filesystem_data(query_result: Option<Vec<FsRecord>>) -> TargetFsRecord {
let x = match query_result {
Some(x) => x,
None => return HashMap::new(),
};

(
target,
filesystems
.split(',')
.map(|x| (Fqdn(host.clone()), x.to_string()))
.collect(),
)
})
.collect::<Vec<(String, Vec<(Fqdn, String)>)>>();
let items = x
.into_iter()
.map(|x| {
let filesystems: String = x.filesystems();
let host: String = x.host;
let target: String = x.target;

items.into_iter().fold(
HashMap::new(),
|mut acc: HashMap<String, Vec<(Fqdn, String)>>, xs| {
let existing = acc.remove(xs.0.as_str());
(
target,
filesystems
.split(',')
.map(|x| (Fqdn(host.clone()), x.to_string()))
.collect(),
)
})
.collect::<Vec<(String, Vec<(Fqdn, String)>)>>();

let x = if let Some(entry) = existing {
[&entry[..], &xs.1[..]].concat()
} else {
xs.1
};
let target_to_fs = items.into_iter().fold(
HashMap::new(),
|mut acc: HashMap<String, Vec<(Fqdn, String)>>, xs| {
let existing = acc.remove(xs.0.as_str());

acc.insert(xs.0, x);
let x = if let Some(entry) = existing {
[&entry[..], &xs.1[..]].concat()
} else {
xs.1
};

acc
},
)
} else {
HashMap::new()
};
acc.insert(xs.0, x);

acc
},
);

tracing::debug!("target_to_fs: {:?}", target_to_fs);

Expand All @@ -557,8 +547,8 @@ fn parse_filesystem_data(query_result: Option<Vec<Node>>) -> TargetFsRecord {
pub async fn get_target_filesystem_map(
influx_client: &Client,
) -> Result<TargetFsRecord, ImlDeviceError> {
let query_result: Option<Vec<Node>> = influx_client
.query(
let query_result: Option<Vec<FsRecord>> = influx_client
.query_into(
"select host,target,fs,bytes_free from target group by target order by time desc limit 1;",
Some(Precision::Nanoseconds),
)
Expand All @@ -571,9 +561,9 @@ pub async fn get_mgs_filesystem_map(
influx_client: &Client,
mounts: &HashMap<Fqdn, HashSet<Mount>>,
) -> Result<TargetFsRecord, ImlDeviceError> {
let query_result: Option<Vec<Node>> = influx_client
.query(
"select host,target,mgs_fs from target;",
let query_result: Option<Vec<FsRecord>> = influx_client
.query_into(
"select host,target,mgs_fs,is_mgs_fs from target order by time desc limit 1;",
Some(Precision::Nanoseconds),
)
.await?;
Expand Down Expand Up @@ -614,9 +604,7 @@ pub async fn get_mgs_filesystem_map(
mod tests {
use super::*;
use device_types::devices::Device;
use influx_db_client::keys::Series;
use insta::assert_json_snapshot;
use serde_json::{json, Map, Value};

#[test]
fn test_index() {
Expand Down Expand Up @@ -729,40 +717,20 @@ mod tests {

#[test]
fn test_parse_target_filesystem_data() {
let query_result = Some(vec![Node {
statement_id: Some(0),
series: Some(vec![Series {
name: "target".to_string(),
tags: Some(
vec![("target".to_string(), json!("fs-OST0000".to_string()))]
.into_iter()
.collect::<Map<String, Value>>(),
),
columns: vec![
"time".into(),
"host".into(),
"target".into(),
"fs".into(),
"bytes_free".into(),
],
values: vec![
vec![
json!(1597166951257510515 as i64),
json!("oss1"),
json!("fs-OST0009"),
json!("fs"),
json!(4913020928 as i64),
],
vec![
json!(1597166951257510515 as i64),
json!("oss1"),
json!("fs-OST0008"),
json!("fs2"),
json!(4913020928 as i64),
],
],
}]),
}]);
let query_result = Some(vec![
FsRecord {
host: "oss1".into(),
target: "fs-OST0009".into(),
fs: Some("fs".into()),
mgs_fs: None,
},
FsRecord {
host: "oss1".into(),
target: "fs-OST0008".into(),
fs: Some("fs2".into()),
mgs_fs: None,
},
]);

let result = parse_filesystem_data(query_result);
assert_eq!(
Expand All @@ -784,39 +752,23 @@ mod tests {

#[test]
fn test_parse_mgs_filesystem_data() {
let query_result = Some(vec![Node {
statement_id: Some(0),
series: Some(vec![Series {
name: "target".to_string(),
tags: Some(
vec![("target".to_string(), json!("MGS".to_string()))]
.into_iter()
.collect::<Map<String, Value>>(),
),
columns: vec![
"time".into(),
"host".into(),
"target".into(),
"mgs_fs".into(),
],
values: vec![
vec![
json!(1597166951257510515 as i64),
json!("mds1"),
json!("MGS"),
json!("mgs1fs1,mgs1fs2"),
],
vec![
json!(1597166951257510515 as i64),
json!("mds1"),
json!("MGS2"),
json!("mgs2fs1,mgs2fs2"),
],
],
}]),
}]);
let query_result = Some(vec![
FsRecord {
host: "mds1".into(),
target: "MGS".into(),
fs: None,
mgs_fs: Some("mgs1fs1,mgs1fs2".into()),
},
FsRecord {
host: "mds1".into(),
target: "MGS2".into(),
fs: None,
mgs_fs: Some("mgs2fs1,mgs2fs2".into()),
},
]);

let result = parse_filesystem_data(query_result);

assert_eq!(
result,
vec![
Expand All @@ -842,36 +794,23 @@ mod tests {

#[test]
fn test_parse_mgs_filesystem_data_on_separate_hosts() {
let query_result = Some(vec![Node {
statement_id: Some(0),
series: Some(vec![Series {
name: "target".to_string(),
tags: None,
columns: vec![
"time".into(),
"host".into(),
"target".into(),
"mgs_fs".into(),
],
values: vec![
vec![
json!(1597166951257510515 as i64),
json!("mds1"),
json!("MGS"),
json!("fs1"),
],
vec![
json!(1597166951257510515 as i64),
json!("oss1"),
json!("MGS"),
json!("fs2"),
],
],
}]),
}]);
let query_result = Some(vec![
FsRecord {
host: "mds1".into(),
target: "MGS".into(),
fs: None,
mgs_fs: Some("fs1".into()),
},
FsRecord {
host: "oss1".into(),
target: "MGS".into(),
fs: None,
mgs_fs: Some("fs2".into()),
},
]);

let result = parse_filesystem_data(query_result);
println!("result: {:?}", result);

assert_eq!(
result,
vec![(
Expand Down
2 changes: 1 addition & 1 deletion iml-services/iml-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use iml_device::{
},
update_client_mounts, update_devices, Cache, ImlDeviceError, TargetFsRecord,
};
use iml_influx::Client;
use iml_manager_env::{get_influxdb_addr, get_influxdb_metrics_db, get_pool_limit};
use iml_postgres::{get_db_pool, sqlx};
use iml_service_queue::service_queue::consume_data;
use iml_tracing::tracing;
use iml_wire_types::Fqdn;
use influx_db_client::Client;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand Down
5 changes: 3 additions & 2 deletions iml-services/iml-stats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ version = "0.3.0"

[dependencies]
futures = "0.3"
iml-influx = {path = "../../iml-influx", version = "0.1", features = ["with-db-client"]}
iml-manager-env = {path = "../../iml-manager-env", version = "0.3"}
iml-rabbit = {path = "../../iml-rabbit", version = "0.3"}
iml-service-queue = {path = "../iml-service-queue", version = "0.3"}
iml-tracing = {version = "0.2", path = "../../iml-tracing"}
iml-wire-types = {path = "../../iml-wire-types", version = "0.3"}
influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"]}
lustre_collector = "0.2.15"
thiserror = "1.0"
serde = {version = "1", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
tokio = {version = "0.2", features = ["macros", "rt-threaded"]}
tracing = "0.1"
url = "2.1.1"
2 changes: 1 addition & 1 deletion iml-services/iml-stats/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum ImlStatsError {
#[error(transparent)]
ImlServiceQueueError(#[from] ImlServiceQueueError),
#[error(transparent)]
InfluxDbError(#[from] influx_db_client::Error),
ImlInfluxError(#[from] iml_influx::Error),
#[error(transparent)]
SystemTimeError(#[from] std::time::SystemTimeError),
#[error(transparent)]
Expand Down
Loading

0 comments on commit de268f6

Please sign in to comment.