diff --git a/.travis.yml b/.travis.yml index 7de78c8..171cfbb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,6 +29,7 @@ rust: - stable - beta - nightly + - 1.39.0 matrix: fast_finish: true @@ -37,8 +38,8 @@ matrix: - rust: stable env: NAME='linting' before_script: - - rustup component add rustfmt-preview - - rustup component add clippy-preview + - rustup component add rustfmt + - rustup component add clippy script: - cargo fmt --all -- --check - cargo clippy --all-targets --all-features -- -D warnings diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a41798..557a5e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Rewrite to `async` / `await`. Rust 1.39 is now the minimum required Rust version. + ## [0.0.5] - 2019-08-16 This release removes the prefix `InfluxDb` of most types in this library and reexports the types under the `influxdb::` path. In most cases, you can directly use the types now: e.g. `influxdb::Client` vs `influxdb::client::InfluxDbClient`. diff --git a/Cargo.toml b/Cargo.toml index 2b2b74d..ce24b1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,15 +14,17 @@ repository = "https://github.com/Empty2k12/influxdb-rust" travis-ci = { repository = "Empty2k12/influxdb-rust", branch = "master" } [dependencies] -reqwest = "0.9.17" -futures = "0.1.27" -tokio = "0.1.20" +chrono = { version = "0.4.9", optional = true } failure = "0.1.5" -serde = { version = "1.0.92", optional = true } +futures = "0.3.1" +reqwest = { version = "0.10", features = ["json"] } +serde = { version = "1.0.92", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } -chrono = { version = "0.4.9", optional = true } [features] use-serde = ["serde", "serde_json"] chrono_timestamps = ["chrono"] default = ["use-serde"] + +[dev-dependencies] +tokio = { version = "0.2", features = ["macros"] } diff --git a/README.md b/README.md index 91d8309..b196c83 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,9 @@ Build with Rust + + Minimum Rust Version +

This library is a work in progress. Although we've been using it in production at [OpenVelo](https://openvelo.org/), @@ -54,8 +57,6 @@ For an example with using Serde deserialization, please refer to [serde_integrat ```rust use influxdb::{Client, Query, Timestamp}; -use serde::Deserialize; -use tokio::runtime::current_thread::Runtime; // Create a Client with URL `http://localhost:8086` // and database name `test` @@ -67,21 +68,15 @@ let client = Client::new("http://localhost:8086", "test"); let write_query = Query::write_query(Timestamp::Now, "weather") .add_field("temperature", 82); -// Since this library is async by default, we're going to need a Runtime, -// which can asynchonously run our query. -// The [tokio](https://crates.io/crates/tokio) crate lets us easily create a new Runtime. -let mut rt = Runtime::new().expect("Unable to create a runtime"); - -// To actually submit the data to InfluxDB, the `block_on` method can be used to -// halt execution of our program until it has been completed. -let write_result = rt.block_on(client.query(&write_query)); +// Submit the query to InfluxDB. +let write_result = client.query(&write_query).await; assert!(write_result.is_ok(), "Write result was not okay"); // Reading data is as simple as writing. First we need to create a query let read_query = Query::raw_read_query("SELECT * FROM weather"); -// Again, we're blocking until the request is done -let read_result = rt.block_on(client.query(&read_query)); +// submit the request and wait until it's done +let read_result = client.query(&read_query).await; assert!(read_result.is_ok(), "Read result was not ok"); diff --git a/README.tpl b/README.tpl index 3ecb796..633da45 100644 --- a/README.tpl +++ b/README.tpl @@ -22,6 +22,9 @@ Build with Rust + + Minimum Rust Version +

{{readme}} diff --git a/src/client/mod.rs b/src/client/mod.rs index d5f6432..2911f85 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -15,11 +15,8 @@ //! assert_eq!(client.database_name(), "test"); //! ``` -use futures::{Future, Stream}; -use reqwest::r#async::{Client as ReqwestClient, Decoder}; -use reqwest::{StatusCode, Url}; - -use std::mem; +use futures::prelude::*; +use reqwest::{self, Client as ReqwestClient, StatusCode, Url}; use crate::query::QueryTypes; use crate::Error; @@ -130,29 +127,27 @@ impl Client { /// Pings the InfluxDB Server /// /// Returns a tuple of build type and version number - pub fn ping(&self) -> impl Future + Send { - ReqwestClient::new() - .get(format!("{}/ping", self.url).as_str()) - .send() - .map(|res| { - let build = res - .headers() - .get("X-Influxdb-Build") - .unwrap() - .to_str() - .unwrap(); - let version = res - .headers() - .get("X-Influxdb-Version") - .unwrap() - .to_str() - .unwrap(); - - (String::from(build), String::from(version)) - }) + pub async fn ping(&self) -> Result<(String, String), Error> { + let res = reqwest::get(format!("{}/ping", self.url).as_str()) + .await .map_err(|err| Error::ProtocolError { error: format!("{}", err), - }) + })?; + + let build = res + .headers() + .get("X-Influxdb-Build") + .unwrap() + .to_str() + .unwrap(); + let version = res + .headers() + .get("X-Influxdb-Version") + .unwrap() + .to_str() + .unwrap(); + + Ok((build.to_owned(), version.to_owned())) } /// Sends a [`ReadQuery`](crate::ReadQuery) or [`WriteQuery`](crate::WriteQuery) to the InfluxDB Server. @@ -165,14 +160,17 @@ impl Client { /// /// # Examples /// - /// ```rust + /// ```rust,no_run /// use influxdb::{Client, Query, Timestamp}; /// + /// # #[tokio::main] + /// # async fn main() -> Result<(), failure::Error> { /// let client = Client::new("http://localhost:8086", "test"); - /// let _future = client.query( - /// &Query::write_query(Timestamp::Now, "weather") - /// .add_field("temperature", 82) - /// ); + /// let query = Query::write_query(Timestamp::Now, "weather") + /// .add_field("temperature", 82); + /// let results = client.query(&query).await?; + /// # Ok(()) + /// # } /// ``` /// # Errors /// @@ -180,41 +178,29 @@ impl Client { /// a [`Error`] variant will be returned. /// /// [`Error`]: enum.Error.html - pub fn query<'q, Q>(&self, q: &'q Q) -> Box + Send> + pub async fn query<'q, Q>(&self, q: &'q Q) -> Result where Q: Query, &'q Q: Into>, { - use futures::future; - - let query = match q.build() { - Err(err) => { - let error = Error::InvalidQueryError { - error: format!("{}", err), - }; - return Box::new(future::err::(error)); - } - Ok(query) => query, - }; + let query = q.build().map_err(|err| Error::InvalidQueryError { + error: format!("{}", err), + })?; let basic_parameters: Vec<(String, String)> = self.into(); let client = match q.into() { QueryTypes::Read(_) => { let read_query = query.get(); - let mut url = match Url::parse_with_params( + let mut url = Url::parse_with_params( format!("{url}/query", url = self.database_url()).as_str(), basic_parameters, - ) { - Ok(url) => url, - Err(err) => { - let error = Error::UrlConstructionError { - error: format!("{}", err), - }; - return Box::new(future::err::(error)); - } - }; - url.query_pairs_mut().append_pair("q", &read_query.clone()); + ) + .map_err(|err| Error::UrlConstructionError { + error: format!("{}", err), + })?; + + url.query_pairs_mut().append_pair("q", &read_query); if read_query.contains("SELECT") || read_query.contains("SHOW") { ReqwestClient::new().get(url) @@ -223,65 +209,44 @@ impl Client { } } QueryTypes::Write(write_query) => { - let mut url = match Url::parse_with_params( + let mut url = Url::parse_with_params( format!("{url}/write", url = self.database_url()).as_str(), basic_parameters, - ) { - Ok(url) => url, - Err(err) => { - let error = Error::InvalidQueryError { - error: format!("{}", err), - }; - return Box::new(future::err::(error)); - } - }; + ) + .map_err(|err| Error::InvalidQueryError { + error: format!("{}", err), + })?; + url.query_pairs_mut() .append_pair("precision", &write_query.get_precision()); + ReqwestClient::new().post(url).body(query.get()) } }; - Box::new( - client - .send() - .map_err(|err| Error::ConnectionError { error: err }) - .and_then( - |res| -> future::FutureResult { - match res.status() { - StatusCode::UNAUTHORIZED => { - futures::future::err(Error::AuthorizationError) - } - StatusCode::FORBIDDEN => { - futures::future::err(Error::AuthenticationError) - } - _ => futures::future::ok(res), - } - }, - ) - .and_then(|mut res| { - let body = mem::replace(res.body_mut(), Decoder::empty()); - body.concat2().map_err(|err| Error::ProtocolError { - error: format!("{}", err), - }) - }) - .and_then(|body| { - if let Ok(utf8) = std::str::from_utf8(&body) { - let s = utf8.to_owned(); - - // todo: improve error parsing without serde - if s.contains("\"error\"") { - return futures::future::err(Error::DatabaseError { - error: format!("influxdb error: \"{}\"", s), - }); - } - - return futures::future::ok(s); - } - - futures::future::err(Error::DeserializationError { - error: "response could not be converted to UTF-8".to_string(), - }) - }), - ) + + let res = client + .send() + .map_err(|err| Error::ConnectionError { error: err }) + .await?; + + match res.status() { + StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError), + StatusCode::FORBIDDEN => return Err(Error::AuthenticationError), + _ => {} + } + + let s = res.text().await.map_err(|_| Error::DeserializationError { + error: "response could not be converted to UTF-8".to_string(), + })?; + + // todo: improve error parsing without serde + if s.contains("\"error\"") { + return Err(Error::DatabaseError { + error: format!("influxdb error: \"{}\"", s), + }); + } + + Ok(s) } } diff --git a/src/integrations/serde_integration.rs b/src/integrations/serde_integration.rs index ef048d9..1c941a1 100644 --- a/src/integrations/serde_integration.rs +++ b/src/integrations/serde_integration.rs @@ -22,46 +22,37 @@ //! weather: WeatherWithoutCityName, //! } //! -//! let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); +//! # #[tokio::main] +//! # async fn main() -> Result<(), failure::Error> { //! let client = Client::new("http://localhost:8086", "test"); //! let query = Query::raw_read_query( //! "SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC", //! ); -//! let _result = rt -//! .block_on(client.json_query(query)) -//! .map(|mut db_result| db_result.deserialize_next::()) -//! .map(|it| { -//! it.map(|series_vec| { -//! series_vec -//! .series -//! .into_iter() -//! .map(|mut city_series| { -//! let city_name = -//! city_series.name.split("_").collect::>().remove(2); -//! Weather { -//! weather: city_series.values.remove(0), -//! city_name: city_name.to_string(), -//! } -//! }) -//! .collect::>() -//! }) -//! }); +//! let mut db_result = client.json_query(query).await?; +//! let _result = db_result +//! .deserialize_next::()? +//! .series +//! .into_iter() +//! .map(|mut city_series| { +//! let city_name = +//! city_series.name.split("_").collect::>().remove(2); +//! Weather { +//! weather: city_series.values.remove(0), +//! city_name: city_name.to_string(), +//! } +//! }) +//! .collect::>(); +//! # Ok(()) +//! # } //! ``` -use serde::de::DeserializeOwned; +use reqwest::{Client as ReqwestClient, StatusCode, Url}; -use futures::{Future, Stream}; -use reqwest::r#async::{Client as ReqwestClient, Decoder}; -use reqwest::{StatusCode, Url}; -use std::mem; - -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; use serde_json; use crate::{Client, Error, Query, ReadQuery}; -use futures::future::Either; - #[derive(Deserialize)] #[doc(hidden)] struct _DatabaseError { @@ -75,18 +66,15 @@ pub struct DatabaseQueryResult { } impl DatabaseQueryResult { - pub fn deserialize_next( - &mut self, - ) -> impl Future, Error = Error> + Send + pub fn deserialize_next(&mut self) -> Result, Error> where T: DeserializeOwned + Send, { - match serde_json::from_value::>(self.results.remove(0)) { - Ok(item) => futures::future::result(Ok(item)), - Err(err) => futures::future::err(Error::DeserializationError { + serde_json::from_value::>(self.results.remove(0)).map_err(|err| { + Error::DeserializationError { error: format!("could not deserialize: {}", err), - }), - } + } + }) } } @@ -104,12 +92,7 @@ pub struct Series { } impl Client { - pub fn json_query( - &self, - q: ReadQuery, - ) -> impl Future + Send { - use futures::future; - + pub async fn json_query(&self, q: ReadQuery) -> Result { let query = q.build().unwrap(); let basic_parameters: Vec<(String, String)> = self.into(); let client = { @@ -124,68 +107,48 @@ impl Client { let error = Error::UrlConstructionError { error: format!("{}", err), }; - return Either::B(future::err::(error)); + return Err(error); } }; url.query_pairs_mut().append_pair("q", &read_query.clone()); - if read_query.contains("SELECT") || read_query.contains("SHOW") { - ReqwestClient::new().get(url.as_str()) - } else { + if !read_query.contains("SELECT") && !read_query.contains("SHOW") { let error = Error::InvalidQueryError { error: String::from( "Only SELECT and SHOW queries supported with JSON deserialization", ), }; - return Either::B(future::err::(error)); + return Err(error); } + + ReqwestClient::new().get(url.as_str()) }; - Either::A( - client - .send() - .map_err(|err| Error::ConnectionError { error: err }) - .and_then( - |res| -> future::FutureResult { - match res.status() { - StatusCode::UNAUTHORIZED => { - futures::future::err(Error::AuthorizationError) - } - StatusCode::FORBIDDEN => { - futures::future::err(Error::AuthenticationError) - } - _ => futures::future::ok(res), - } - }, - ) - .and_then(|mut res| { - let body = mem::replace(res.body_mut(), Decoder::empty()); - body.concat2().map_err(|err| Error::ProtocolError { - error: format!("{}", err), - }) - }) - .and_then(|body| { - // Try parsing InfluxDBs { "error": "error message here" } - if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) { - futures::future::err(Error::DatabaseError { - error: error.error.to_string(), - }) - } else { - // Json has another structure, let's try actually parsing it to the type we're deserializing - let from_slice = serde_json::from_slice::(&body); - - let deserialized = match from_slice { - Ok(deserialized) => deserialized, - Err(err) => { - return futures::future::err(Error::DeserializationError { - error: format!("serde error: {}", err), - }) - } - }; - - futures::future::result(Ok(deserialized)) - } - }), - ) + let res = client + .send() + .await + .map_err(|err| Error::ConnectionError { error: err })?; + + match res.status() { + StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError), + StatusCode::FORBIDDEN => return Err(Error::AuthenticationError), + _ => {} + } + + let body = res.bytes().await.map_err(|err| Error::ProtocolError { + error: format!("{}", err), + })?; + + // Try parsing InfluxDBs { "error": "error message here" } + if let Ok(error) = serde_json::from_slice::<_DatabaseError>(&body) { + return Err(Error::DatabaseError { error: error.error }); + } + + // Json has another structure, let's try actually parsing it to the type we're deserializing + serde_json::from_slice::(&body).map_err(|err| { + Error::DeserializationError { + error: format!("serde error: {}", err), + } + }) } } diff --git a/src/lib.rs b/src/lib.rs index c9bbf7f..5e531e4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,9 +28,9 @@ //! //! ```rust,no_run //! use influxdb::{Client, Query, Timestamp}; -//! use serde::Deserialize; -//! use tokio::runtime::current_thread::Runtime; //! +//! # #[tokio::main] +//! # async fn main() { //! // Create a Client with URL `http://localhost:8086` //! // and database name `test` //! let client = Client::new("http://localhost:8086", "test"); @@ -41,27 +41,22 @@ //! let write_query = Query::write_query(Timestamp::Now, "weather") //! .add_field("temperature", 82); //! -//! // Since this library is async by default, we're going to need a Runtime, -//! // which can asynchonously run our query. -//! // The [tokio](https://crates.io/crates/tokio) crate lets us easily create a new Runtime. -//! let mut rt = Runtime::new().expect("Unable to create a runtime"); -//! -//! // To actually submit the data to InfluxDB, the `block_on` method can be used to -//! // halt execution of our program until it has been completed. -//! let write_result = rt.block_on(client.query(&write_query)); +//! // Submit the query to InfluxDB. +//! let write_result = client.query(&write_query).await; //! assert!(write_result.is_ok(), "Write result was not okay"); //! //! // Reading data is as simple as writing. First we need to create a query //! let read_query = Query::raw_read_query("SELECT * FROM weather"); //! -//! // Again, we're blocking until the request is done -//! let read_result = rt.block_on(client.query(&read_query)); +//! // submit the request and wait until it's done +//! let read_result = client.query(&read_query).await; //! //! assert!(read_result.is_ok(), "Read result was not ok"); //! //! // We can be sure the result was successful, so we can unwrap the result to get //! // the response String from InfluxDB //! println!("{}", read_result.unwrap()); +//! # } //! ``` //! //! For further examples, check out the Integration Tests in `tests/integration_tests.rs` @@ -72,6 +67,8 @@ //! [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) //! +#![allow(clippy::needless_doctest_main)] + #[macro_use] extern crate failure; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 1370e15..97ee96b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,10 +1,9 @@ extern crate influxdb; use futures::prelude::*; -use influxdb::Client; -use influxdb::Error; -use influxdb::{Query, Timestamp}; -use tokio::runtime::current_thread::Runtime; +use influxdb::{Client, Error, Query, Timestamp}; +use std::panic::{AssertUnwindSafe, UnwindSafe}; +use tokio; fn assert_result_err(result: &Result) { result.as_ref().expect_err("assert_result_err failed"); @@ -14,10 +13,6 @@ fn assert_result_ok(result: &Result Runtime { - Runtime::new().expect("Unable to create a runtime") -} - fn create_client(db_name: T) -> Client where T: Into, @@ -25,41 +20,50 @@ where Client::new("http://localhost:8086", db_name) } -struct RunOnDrop { - closure: Box ()>, -} - -impl Drop for RunOnDrop { - fn drop(&mut self) { - (self.closure)(); - } -} - -fn create_db(name: T) -> Result +async fn create_db(name: T) -> Result where T: Into, { let test_name = name.into(); let query = format!("CREATE DATABASE {}", test_name); - get_runtime().block_on(create_client(test_name).query(&Query::raw_read_query(query))) + create_client(test_name) + .query(&Query::raw_read_query(query)) + .await } -fn delete_db(name: T) -> Result +async fn delete_db(name: T) -> Result where T: Into, { let test_name = name.into(); let query = format!("DROP DATABASE {}", test_name); - get_runtime().block_on(create_client(test_name).query(&Query::raw_read_query(query))) + create_client(test_name) + .query(&Query::raw_read_query(query)) + .await +} + +async fn run_test(test_fn: F, teardown: T) +where + F: FnOnce() -> Fut1 + UnwindSafe, + T: FnOnce() -> Fut2, + Fut1: Future, + Fut2: Future, +{ + let test_result = AssertUnwindSafe(test_fn()).catch_unwind().await; + AssertUnwindSafe(teardown()) + .catch_unwind() + .await + .expect("failed teardown"); + test_result.expect("failed test"); } -#[test] /// INTEGRATION TEST /// /// This test case tests whether the InfluxDB server can be connected to and gathers info about it -fn test_ping_influx_db() { +#[tokio::test] +async fn test_ping_influx_db() { let client = create_client("notusedhere"); - let result = get_runtime().block_on(client.ping()); + let result = client.ping().await; assert_result_ok(&result); let (build, version) = result.unwrap(); @@ -69,16 +73,16 @@ fn test_ping_influx_db() { println!("build: {} version: {}", build, version); } -#[test] /// INTEGRATION TEST /// /// This test case tests connection error -fn test_connection_error() { +#[tokio::test] +async fn test_connection_error() { let test_name = "test_connection_error"; let client = Client::new("http://localhost:10086", test_name).with_auth("nopriv_user", "password"); let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(client.query(&read_query)); + let read_result = client.query(&read_query).await; assert_result_err(&read_result); match read_result { Err(Error::ConnectionError { .. }) => {} @@ -89,409 +93,474 @@ fn test_connection_error() { } } -#[test] /// INTEGRATION TEST /// /// This test case tests the Authentication -fn test_authed_write_and_read() { - let test_name = "test_authed_write_and_read"; - let client = Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("CREATE DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not setup db"); - - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - let test_name = "test_authed_write_and_read"; - let client = - Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("DROP DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not clean up db"); - }), - }; - - let client = Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let write_query = - Query::write_query(Timestamp::Hours(11), "weather").add_field("temperature", 82); - let write_result = get_runtime().block_on(client.query(&write_query)); - assert_result_ok(&write_result); - - let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(client.query(&read_query)); - assert_result_ok(&read_result); - assert!( - !read_result.unwrap().contains("error"), - "Data contained a database error" - ); +#[tokio::test] +async fn test_authed_write_and_read() { + const TEST_NAME: &str = "test_authed_write_and_read"; + + run_test( + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("CREATE DATABASE {}", TEST_NAME); + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not setup db"); + + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82); + let write_result = client.query(&write_query).await; + assert_result_ok(&write_result); + + let read_query = Query::raw_read_query("SELECT * FROM weather"); + let read_result = client.query(&read_query).await; + assert_result_ok(&read_result); + assert!( + !read_result.unwrap().contains("error"), + "Data contained a database error" + ); + } + }, + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("DROP DATABASE {}", TEST_NAME); + + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not clean up db"); + } + }, + ) + .await; } -#[test] /// INTEGRATION TEST /// /// This test case tests the Authentication -fn test_wrong_authed_write_and_read() { - let test_name = "test_wrong_authed_write_and_read"; - let client = Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("CREATE DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not setup db"); - - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - let test_name = "test_wrong_authed_write_and_read"; - let client = - Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("DROP DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not clean up db"); - }), - }; - - let client = - Client::new("http://localhost:9086", test_name).with_auth("wrong_user", "password"); - let write_query = - Query::write_query(Timestamp::Hours(11), "weather").add_field("temperature", 82); - let write_result = get_runtime().block_on(client.query(&write_query)); - assert_result_err(&write_result); - match write_result { - Err(Error::AuthorizationError) => {} - _ => panic!(format!( - "Should be an AuthorizationError: {}", - write_result.unwrap_err() - )), - } - - let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(client.query(&read_query)); - assert_result_err(&read_result); - match read_result { - Err(Error::AuthorizationError) => {} - _ => panic!(format!( - "Should be an AuthorizationError: {}", - read_result.unwrap_err() - )), - } - - let client = - Client::new("http://localhost:9086", test_name).with_auth("nopriv_user", "password"); - let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(client.query(&read_query)); - assert_result_err(&read_result); - match read_result { - Err(Error::AuthenticationError) => {} - _ => panic!(format!( - "Should be an AuthenticationError: {}", - read_result.unwrap_err() - )), - } +#[tokio::test] +async fn test_wrong_authed_write_and_read() { + const TEST_NAME: &str = "test_wrong_authed_write_and_read"; + + run_test( + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("CREATE DATABASE {}", TEST_NAME); + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not setup db"); + + let client = Client::new("http://localhost:9086", TEST_NAME) + .with_auth("wrong_user", "password"); + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82); + let write_result = client.query(&write_query).await; + assert_result_err(&write_result); + match write_result { + Err(Error::AuthorizationError) => {} + _ => panic!(format!( + "Should be an AuthorizationError: {}", + write_result.unwrap_err() + )), + } + + let read_query = Query::raw_read_query("SELECT * FROM weather"); + let read_result = client.query(&read_query).await; + assert_result_err(&read_result); + match read_result { + Err(Error::AuthorizationError) => {} + _ => panic!(format!( + "Should be an AuthorizationError: {}", + read_result.unwrap_err() + )), + } + + let client = Client::new("http://localhost:9086", TEST_NAME) + .with_auth("nopriv_user", "password"); + let read_query = Query::raw_read_query("SELECT * FROM weather"); + let read_result = client.query(&read_query).await; + assert_result_err(&read_result); + match read_result { + Err(Error::AuthenticationError) => {} + _ => panic!(format!( + "Should be an AuthenticationError: {}", + read_result.unwrap_err() + )), + } + } + }, + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("DROP DATABASE {}", TEST_NAME); + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not clean up db"); + } + }, + ) + .await; } -#[test] /// INTEGRATION TEST /// /// This test case tests the Authentication -fn test_non_authed_write_and_read() { - let test_name = "test_non_authed_write_and_read"; - let client = Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("CREATE DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not setup db"); - - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - let test_name = "test_non_authed_write_and_read"; - let client = - Client::new("http://localhost:9086", test_name).with_auth("admin", "password"); - let query = format!("DROP DATABASE {}", test_name); - get_runtime() - .block_on(client.query(&Query::raw_read_query(query))) - .expect("could not clean up db"); - }), - }; - let non_authed_client = Client::new("http://localhost:9086", test_name); - let write_query = - Query::write_query(Timestamp::Hours(11), "weather").add_field("temperature", 82); - let write_result = get_runtime().block_on(non_authed_client.query(&write_query)); - assert_result_err(&write_result); - match write_result { - Err(Error::AuthorizationError) => {} - _ => panic!(format!( - "Should be an AuthorizationError: {}", - write_result.unwrap_err() - )), - } - - let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(non_authed_client.query(&read_query)); - assert_result_err(&read_result); - match read_result { - Err(Error::AuthorizationError) => {} - _ => panic!(format!( - "Should be an AuthorizationError: {}", - read_result.unwrap_err() - )), - } +#[tokio::test] +async fn test_non_authed_write_and_read() { + const TEST_NAME: &str = "test_non_authed_write_and_read"; + + run_test( + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("CREATE DATABASE {}", TEST_NAME); + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not setup db"); + let non_authed_client = Client::new("http://localhost:9086", TEST_NAME); + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82); + let write_result = non_authed_client.query(&write_query).await; + assert_result_err(&write_result); + match write_result { + Err(Error::AuthorizationError) => {} + _ => panic!(format!( + "Should be an AuthorizationError: {}", + write_result.unwrap_err() + )), + } + + let read_query = Query::raw_read_query("SELECT * FROM weather"); + let read_result = non_authed_client.query(&read_query).await; + assert_result_err(&read_result); + match read_result { + Err(Error::AuthorizationError) => {} + _ => panic!(format!( + "Should be an AuthorizationError: {}", + read_result.unwrap_err() + )), + } + } + }, + || { + async move { + let client = + Client::new("http://localhost:9086", TEST_NAME).with_auth("admin", "password"); + let query = format!("DROP DATABASE {}", TEST_NAME); + client + .query(&Query::raw_read_query(query)) + .await + .expect("could not clean up db"); + } + }, + ) + .await; } -#[test] /// INTEGRATION TEST /// /// This integration tests that writing data and retrieving the data again is working -fn test_write_and_read_field() { - let test_name = "test_write_field"; - create_db(test_name).expect("could not setup db"); - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - delete_db("test_write_field").expect("could not clean up db"); - }), - }; - - let client = create_client(test_name); - let write_query = - Query::write_query(Timestamp::Hours(11), "weather").add_field("temperature", 82); - let write_result = get_runtime().block_on(client.query(&write_query)); - assert_result_ok(&write_result); - - let read_query = Query::raw_read_query("SELECT * FROM weather"); - let read_result = get_runtime().block_on(client.query(&read_query)); - assert_result_ok(&read_result); - assert!( - !read_result.unwrap().contains("error"), - "Data contained a database error" - ); - - delete_db(test_name).expect("could not clean up db"); +#[tokio::test] +async fn test_write_and_read_field() { + const TEST_NAME: &str = "test_write_field"; + + run_test( + || { + async move { + create_db(TEST_NAME).await.expect("could not setup db"); + let client = create_client(TEST_NAME); + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82); + let write_result = client.query(&write_query).await; + assert_result_ok(&write_result); + + let read_query = Query::raw_read_query("SELECT * FROM weather"); + let read_result = client.query(&read_query).await; + assert_result_ok(&read_result); + assert!( + !read_result.unwrap().contains("error"), + "Data contained a database error" + ); + } + }, + || { + async move { + delete_db(TEST_NAME).await.expect("could not clean up db"); + } + }, + ) + .await; } -#[test] -#[cfg(feature = "use-serde")] /// INTEGRATION TEST /// /// This integration tests that writing data and retrieving the data again is working -fn test_write_and_read_option() { +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_write_and_read_option() { use serde::Deserialize; - let test_name = "test_write_and_read_option"; - create_db(test_name).expect("could not setup db"); - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - delete_db("test_write_and_read_option").expect("could not clean up db"); - }), - }; - - let client = create_client(test_name); - // Todo: Convert this to derive based insert for easier comparison of structs - let write_query = Query::write_query(Timestamp::Hours(11), "weather") - .add_field("temperature", 82) - .add_field("wind_strength", >::None); - let write_result = get_runtime().block_on(client.query(&write_query)); - assert_result_ok(&write_result); - - #[derive(Deserialize, Debug, PartialEq)] - struct Weather { - time: String, - temperature: i32, - wind_strength: Option, - } - - let query = Query::raw_read_query("SELECT time, temperature, wind_strength FROM weather"); - let future = client - .json_query(query) - .and_then(|mut db_result| db_result.deserialize_next::()); - let result = get_runtime().block_on(future); - assert_result_ok(&result); - assert_eq!( - result.unwrap().series[0].values[0], - Weather { - time: "1970-01-01T11:00:00Z".to_string(), - temperature: 82, - wind_strength: None, - } - ); - delete_db(test_name).expect("could not clean up db"); + const TEST_NAME: &str = "test_write_and_read_option"; + + run_test( + || { + async move { + create_db(TEST_NAME).await.expect("could not setup db"); + + let client = create_client(TEST_NAME); + // Todo: Convert this to derive based insert for easier comparison of structs + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82) + .add_field("wind_strength", >::None); + let write_result = client.query(&write_query).await; + assert_result_ok(&write_result); + + #[derive(Deserialize, Debug, PartialEq)] + struct Weather { + time: String, + temperature: i32, + wind_strength: Option, + } + + let query = + Query::raw_read_query("SELECT time, temperature, wind_strength FROM weather"); + let result = client + .json_query(query) + .await + .and_then(|mut db_result| db_result.deserialize_next::()); + assert_result_ok(&result); + + assert_eq!( + result.unwrap().series[0].values[0], + Weather { + time: "1970-01-01T11:00:00Z".to_string(), + temperature: 82, + wind_strength: None, + } + ); + } + }, + || { + async move { + delete_db("test_write_and_read_option") + .await + .expect("could not clean up db"); + } + }, + ) + .await; } -#[test] -#[cfg(feature = "use-serde")] /// INTEGRATION TEST /// /// This test case tests whether JSON can be decoded from a InfluxDB response and whether that JSON /// is equal to the data which was written to the database -fn test_json_query() { +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_json_query() { use serde::Deserialize; - let test_name = "test_json_query"; - create_db(test_name).expect("could not setup db"); - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - delete_db("test_json_query").expect("could not clean up db"); - }), - }; - - let client = create_client(test_name); - - // todo: implement deriving so objects can easily be placed in InfluxDB - let write_query = - Query::write_query(Timestamp::Hours(11), "weather").add_field("temperature", 82); - let write_result = get_runtime().block_on(client.query(&write_query)); - assert_result_ok(&write_result); - - #[derive(Deserialize, Debug, PartialEq)] - struct Weather { - time: String, - temperature: i32, - } - - let query = Query::raw_read_query("SELECT * FROM weather"); - let future = client - .json_query(query) - .and_then(|mut db_result| db_result.deserialize_next::()); - let result = get_runtime().block_on(future); - assert_result_ok(&result); - - assert_eq!( - result.unwrap().series[0].values[0], - Weather { - time: "1970-01-01T11:00:00Z".to_string(), - temperature: 82 - } - ); - - delete_db(test_name).expect("could not clean up db"); + const TEST_NAME: &str = "test_json_query"; + + run_test( + || { + async move { + create_db(TEST_NAME).await.expect("could not setup db"); + + let client = create_client(TEST_NAME); + + // todo: implement deriving so objects can easily be placed in InfluxDB + let write_query = Query::write_query(Timestamp::Hours(11), "weather") + .add_field("temperature", 82); + let write_result = client.query(&write_query).await; + assert_result_ok(&write_result); + + #[derive(Deserialize, Debug, PartialEq)] + struct Weather { + time: String, + temperature: i32, + } + + let query = Query::raw_read_query("SELECT * FROM weather"); + let result = client + .json_query(query) + .await + .and_then(|mut db_result| db_result.deserialize_next::()); + assert_result_ok(&result); + + assert_eq!( + result.unwrap().series[0].values[0], + Weather { + time: "1970-01-01T11:00:00Z".to_string(), + temperature: 82 + } + ); + } + }, + || { + async move { + delete_db(TEST_NAME).await.expect("could not clean up db"); + } + }, + ) + .await; } -#[test] -#[cfg(feature = "use-serde")] /// INTEGRATION TEST /// /// This test case tests whether JSON can be decoded from a InfluxDB response and wether that JSON /// is equal to the data which was written to the database -fn test_json_query_vec() { +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_json_query_vec() { use serde::Deserialize; - let test_name = "test_json_query_vec"; - create_db(test_name).expect("could not setup db"); - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - delete_db("test_json_query_vec").expect("could not clean up db"); - }), - }; - - let client = create_client(test_name); - let write_query1 = - Query::write_query(Timestamp::Hours(11), "temperature_vec").add_field("temperature", 16); - let write_query2 = - Query::write_query(Timestamp::Hours(12), "temperature_vec").add_field("temperature", 17); - let write_query3 = - Query::write_query(Timestamp::Hours(13), "temperature_vec").add_field("temperature", 18); - - let _write_result = get_runtime().block_on(client.query(&write_query1)); - let _write_result2 = get_runtime().block_on(client.query(&write_query2)); - let _write_result2 = get_runtime().block_on(client.query(&write_query3)); - - #[derive(Deserialize, Debug, PartialEq)] - struct Weather { - time: String, - temperature: i32, - } - - let query = Query::raw_read_query("SELECT * FROM temperature_vec"); - let future = client - .json_query(query) - .and_then(|mut db_result| db_result.deserialize_next::()); - let result = get_runtime().block_on(future); - assert_result_ok(&result); - assert_eq!(result.unwrap().series[0].values.len(), 3); - - delete_db(test_name).expect("could not clean up db"); + const TEST_NAME: &str = "test_json_query_vec"; + + run_test( + || { + async move { + create_db(TEST_NAME).await.expect("could not setup db"); + + let client = create_client(TEST_NAME); + let write_query1 = Query::write_query(Timestamp::Hours(11), "temperature_vec") + .add_field("temperature", 16); + let write_query2 = Query::write_query(Timestamp::Hours(12), "temperature_vec") + .add_field("temperature", 17); + let write_query3 = Query::write_query(Timestamp::Hours(13), "temperature_vec") + .add_field("temperature", 18); + + let _write_result = client.query(&write_query1).await; + let _write_result2 = client.query(&write_query2).await; + let _write_result2 = client.query(&write_query3).await; + + #[derive(Deserialize, Debug, PartialEq)] + struct Weather { + time: String, + temperature: i32, + } + + let query = Query::raw_read_query("SELECT * FROM temperature_vec"); + let result = client + .json_query(query) + .await + .and_then(|mut db_result| db_result.deserialize_next::()); + assert_result_ok(&result); + assert_eq!(result.unwrap().series[0].values.len(), 3); + } + }, + || { + async move { + delete_db(TEST_NAME).await.expect("could not clean up db"); + } + }, + ) + .await; } -#[test] -#[cfg(feature = "use-serde")] /// INTEGRATION TEST /// /// This integration test tests whether using the wrong query method fails building the query -fn test_serde_multi_query() { +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_serde_multi_query() { use serde::Deserialize; - let test_name = "test_serde_multi_query"; - create_db(test_name).expect("could not setup db"); - let _run_on_drop = RunOnDrop { - closure: Box::new(|| { - delete_db("test_serde_multi_query").expect("could not clean up db"); - }), - }; - - #[derive(Deserialize, Debug, PartialEq)] - struct Temperature { - time: String, - temperature: i32, - } - - #[derive(Deserialize, Debug, PartialEq)] - struct Humidity { - time: String, - humidity: i32, - } - - let client = create_client(test_name); - let write_query = - Query::write_query(Timestamp::Hours(11), "temperature").add_field("temperature", 16); - let write_query2 = - Query::write_query(Timestamp::Hours(11), "humidity").add_field("humidity", 69); - - let write_result = get_runtime().block_on(client.query(&write_query)); - let write_result2 = get_runtime().block_on(client.query(&write_query2)); - assert_result_ok(&write_result); - assert_result_ok(&write_result2); - - let future = client - .json_query( - Query::raw_read_query("SELECT * FROM temperature").add_query("SELECT * FROM humidity"), - ) - .and_then(|mut db_result| { - let temp = db_result.deserialize_next::(); - let humidity = db_result.deserialize_next::(); - - (temp, humidity) - }); - let result = get_runtime().block_on(future); - assert_result_ok(&result); - - let (temp, humidity) = result.unwrap(); - assert_eq!( - temp.series[0].values[0], - Temperature { - time: "1970-01-01T11:00:00Z".to_string(), - temperature: 16 + const TEST_NAME: &str = "test_serde_multi_query"; + + run_test( + || { + async move { + create_db(TEST_NAME).await.expect("could not setup db"); + + #[derive(Deserialize, Debug, PartialEq)] + struct Temperature { + time: String, + temperature: i32, + } + + #[derive(Deserialize, Debug, PartialEq)] + struct Humidity { + time: String, + humidity: i32, + } + + let client = create_client(TEST_NAME); + let write_query = Query::write_query(Timestamp::Hours(11), "temperature") + .add_field("temperature", 16); + let write_query2 = + Query::write_query(Timestamp::Hours(11), "humidity").add_field("humidity", 69); + + let write_result = client.query(&write_query).await; + let write_result2 = client.query(&write_query2).await; + assert_result_ok(&write_result); + assert_result_ok(&write_result2); + + let result = client + .json_query( + Query::raw_read_query("SELECT * FROM temperature") + .add_query("SELECT * FROM humidity"), + ) + .await + .and_then(|mut db_result| { + let temp = db_result.deserialize_next::()?; + let humidity = db_result.deserialize_next::()?; + + Ok((temp, humidity)) + }); + assert_result_ok(&result); + + let (temp, humidity) = result.unwrap(); + assert_eq!( + temp.series[0].values[0], + Temperature { + time: "1970-01-01T11:00:00Z".to_string(), + temperature: 16 + }, + ); + assert_eq!( + humidity.series[0].values[0], + Humidity { + time: "1970-01-01T11:00:00Z".to_string(), + humidity: 69 + } + ); + } }, - ); - assert_eq!( - humidity.series[0].values[0], - Humidity { - time: "1970-01-01T11:00:00Z".to_string(), - humidity: 69 - } - ); - - delete_db(test_name).expect("could not clean up db"); + || { + async move { + delete_db(TEST_NAME).await.expect("could not clean up db"); + } + }, + ) + .await; } -#[test] -#[cfg(feature = "use-serde")] /// INTEGRATION TEST /// /// This integration test tests whether using the wrong query method fails building the query -fn test_wrong_query_errors() { +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_wrong_query_errors() { let client = create_client("test_name"); - let future = client.json_query(Query::raw_read_query("CREATE DATABASE this_should_fail")); - let result = get_runtime().block_on(future); + let result = client + .json_query(Query::raw_read_query("CREATE DATABASE this_should_fail")) + .await; assert!( result.is_err(), "Should only build SELECT and SHOW queries."