Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Implement influx client extension
Browse files Browse the repository at this point in the history
Implement an influx client extenstion that allows for direct
serialization to a struct instead of needing to traverse the result and
build it by hand. Ex:

```rust
struct MgsFsTime {
    time: u64,
}

let xs: Vec<MgsFsTime> = client
        .query_into(
            format!(
                r#"
            SELECT mgs_fs,is_mgs_fs
            FROM target
            WHERE mgs_fs='{}' AND is_mgs_fs=true
            ORDER BY time ASC
            LIMIT 2"#,
                fs_names
            )
            .as_str(),
            Some(Precision::Nanoseconds),
        )
        .await?
        .unwrap_or_default();
```

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Sep 22, 2020
1 parent e033bf5 commit ed8796f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 20 deletions.
41 changes: 26 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions iml-influx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ name = "iml-influx"
version = "0.1.0"

[dependencies]
futures = {version = "0.3", optional = true}
influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"], optional = true}
serde = {version = "1", features = ['derive']}
serde_json = {version = "1"}
thiserror = {version = "1.0", optional = true}

[dev-dependencies]
influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"]}

[features]
with-db-client = ["influx_db_client", "futures", "thiserror"]
67 changes: 62 additions & 5 deletions iml-influx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,69 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use std::collections::HashMap;

pub mod filesystem;
pub mod filesystems;

use serde_json::{Map, Value};
#[cfg(feature = "with-db-client")]
use futures::{future::BoxFuture, FutureExt};
#[cfg(feature = "with-db-client")]
pub use influx_db_client::{Client, Point, Points, Precision, Value};
use serde_json::Map;
use std::collections::HashMap;

#[cfg(feature = "with-db-client")]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
InfluxDbError(#[from] influx_db_client::Error),
#[error(transparent)]
Serde(#[from] serde_json::Error),
}

#[cfg(feature = "with-db-client")]
pub trait InfluxClientExt {
fn query_into<T: serde::de::DeserializeOwned>(
&self,
q: &str,
epoch: Option<Precision>,
) -> BoxFuture<Result<Option<Vec<T>>, Error>>;
}

#[cfg(feature = "with-db-client")]
impl InfluxClientExt for Client {
fn query_into<T: serde::de::DeserializeOwned>(
&self,
q: &str,
epoch: Option<Precision>,
) -> BoxFuture<Result<Option<Vec<T>>, Error>> {
let q = self.query(q, epoch);

async move {
let r = q.await?;

let x = if let Some(nodes) = r {
let items = nodes
.into_iter()
.filter_map(|x| x.series)
.flatten()
.map(|x| -> serde_json::Value { ColVals(x.columns, x.values).into() })
.map(|x| -> Result<Vec<T>, Error> {
let x = serde_json::from_value(x)?;

Ok(x)
})
.collect::<Result<Vec<Vec<T>>, _>>()?;

Some(Ok(items.into_iter().flatten().collect()))
} else {
None
};

x.transpose()
}
.boxed()
}
}

#[derive(serde::Deserialize, Clone, Debug)]
pub struct InfluxResponse<T> {
Expand All @@ -34,10 +91,10 @@ impl From<ColVals> for serde_json::Value {
.map(|y| -> Map<String, serde_json::Value> {
cols.clone().into_iter().zip(y).collect()
})
.map(Value::Object)
.map(serde_json::Value::Object)
.collect();

Value::Array(xs)
serde_json::Value::Array(xs)
}
}

Expand Down

0 comments on commit ed8796f

Please sign in to comment.