diff --git a/Cargo.lock b/Cargo.lock index 06fccecb35..0f48ad9630 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,9 +1265,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9" +checksum = "4c30f6d0bc6b00693347368a67d41b58f2fb851215ff1da49e90fe2c5c667151" dependencies = [ "libc", ] @@ -1325,6 +1325,12 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" +[[package]] +name = "httpdate" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" + [[package]] name = "humantime" version = "1.3.0" @@ -1342,9 +1348,9 @@ checksum = "3c1ad908cc71012b7bea4d0c53ba96a8cba9962f048fa68d143376143d863b7a" [[package]] name = "hyper" -version = "0.13.7" +version = "0.13.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb" +checksum = "2f3afcfae8af5ad0576a31e768415edb627824129e8e5a29b8bfccb2f234e835" dependencies = [ "bytes", "futures-channel", @@ -1354,10 +1360,10 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project", "socket2", - "time 0.1.44", "tokio", "tower-service", "tracing", @@ -1658,9 +1664,11 @@ dependencies = [ name = "iml-influx" version = "0.1.0" dependencies = [ + "futures", "influx_db_client", "serde", "serde_json", + "thiserror", ] [[package]] @@ -1694,7 +1702,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "time 0.2.20", + "time 0.2.21", "tokio", ] @@ -2156,9 +2164,12 @@ dependencies = [ [[package]] name = "instant" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485" +checksum = "63312a18f7ea8760cdd0a7c5aac1a619752a246b833545e3e36d1f81f7cd9e66" +dependencies = [ + "cfg-if", +] [[package]] name = "iovec" @@ -3787,7 +3798,7 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", - "time 0.2.20", + "time 0.2.21", "url", "whoami", ] @@ -4162,9 +4173,9 @@ dependencies = [ [[package]] name = "time" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4953c513c9bf1b97e9cdd83f11d60c4b0a83462880a360d80d96953a953fee" +checksum = "2c2e31fb28e2a9f01f5ed6901b066c1ba2333c04b64dc61254142bafcb3feb2c" dependencies = [ "const_fn", "libc", diff --git a/iml-influx/Cargo.toml b/iml-influx/Cargo.toml index a3b04a3c44..9b5fa8fd37 100644 --- a/iml-influx/Cargo.toml +++ b/iml-influx/Cargo.toml @@ -6,8 +6,14 @@ name = "iml-influx" version = "0.1.0" [dependencies] +futures = {version = "0.3", optional = true} +influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"], optional = true} serde = {version = "1", features = ['derive']} serde_json = {version = "1"} +thiserror = {version = "1.0", optional = true} [dev-dependencies] influx_db_client = {version = "0.4", default-features = false, features = ["rustls-tls"]} + +[features] +with-db-client = ["influx_db_client", "futures", "thiserror"] diff --git a/iml-influx/src/lib.rs b/iml-influx/src/lib.rs index 5717963cf0..131f80f81b 100644 --- a/iml-influx/src/lib.rs +++ b/iml-influx/src/lib.rs @@ -2,12 +2,69 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -use std::collections::HashMap; - pub mod filesystem; pub mod filesystems; -use serde_json::{Map, Value}; +#[cfg(feature = "with-db-client")] +use futures::{future::BoxFuture, FutureExt}; +#[cfg(feature = "with-db-client")] +pub use influx_db_client::{Client, Point, Points, Precision, Value}; +use serde_json::Map; +use std::collections::HashMap; + +#[cfg(feature = "with-db-client")] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + InfluxDbError(#[from] influx_db_client::Error), + #[error(transparent)] + Serde(#[from] serde_json::Error), +} + +#[cfg(feature = "with-db-client")] +pub trait InfluxClientExt { + fn query_into( + &self, + q: &str, + epoch: Option, + ) -> BoxFuture>, Error>>; +} + +#[cfg(feature = "with-db-client")] +impl InfluxClientExt for Client { + fn query_into( + &self, + q: &str, + epoch: Option, + ) -> BoxFuture>, Error>> { + let q = self.query(q, epoch); + + async move { + let r = q.await?; + + let x = if let Some(nodes) = r { + let items = nodes + .into_iter() + .filter_map(|x| x.series) + .flatten() + .map(|x| -> serde_json::Value { ColVals(x.columns, x.values).into() }) + .map(|x| -> Result, Error> { + let x = serde_json::from_value(x)?; + + Ok(x) + }) + .collect::>, _>>()?; + + Some(Ok(items.into_iter().flatten().collect())) + } else { + None + }; + + x.transpose() + } + .boxed() + } +} #[derive(serde::Deserialize, Clone, Debug)] pub struct InfluxResponse { @@ -34,10 +91,10 @@ impl From for serde_json::Value { .map(|y| -> Map { cols.clone().into_iter().zip(y).collect() }) - .map(Value::Object) + .map(serde_json::Value::Object) .collect(); - Value::Array(xs) + serde_json::Value::Array(xs) } }