Skip to content

Commit

Permalink
Use async await (#45)
Browse files Browse the repository at this point in the history
* Use async await

* Fix integration tests

* Update the readme to use async await

* Indicate minimum required rust version

1.39 due to async / await

* Replace RunOnDrop with manual test harness function

* Update reqwest to v0.10

* Enable missing tokio features

* Fix error handling

* Fix docs

* Add minimum Rust version to test suite

* Document rewrite to async / await

* Fix clippy errors

* Allow main functions in docs

* Reformat code

* Use stable clippy and rustfmt
  • Loading branch information
NeoLegends authored and Empty2k12 committed Jan 4, 2020
1 parent 04d1fc8 commit 1b0cae1
Show file tree
Hide file tree
Showing 9 changed files with 606 additions and 607 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rust:
- stable
- beta
- nightly
- 1.39.0

matrix:
fast_finish: true
Expand All @@ -37,8 +38,8 @@ matrix:
- rust: stable
env: NAME='linting'
before_script:
- rustup component add rustfmt-preview
- rustup component add clippy-preview
- rustup component add rustfmt
- rustup component add clippy
script:
- cargo fmt --all -- --check
- cargo clippy --all-targets --all-features -- -D warnings
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Rewrite to `async` / `await`. Rust 1.39 is now the minimum required Rust version.

## [0.0.5] - 2019-08-16

This release removes the prefix `InfluxDb` of most types in this library and reexports the types under the `influxdb::` path. In most cases, you can directly use the types now: e.g. `influxdb::Client` vs `influxdb::client::InfluxDbClient`.
Expand Down
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ repository = "https://github.com/Empty2k12/influxdb-rust"
travis-ci = { repository = "Empty2k12/influxdb-rust", branch = "master" }

[dependencies]
reqwest = "0.9.17"
futures = "0.1.27"
tokio = "0.1.20"
chrono = { version = "0.4.9", optional = true }
failure = "0.1.5"
serde = { version = "1.0.92", optional = true }
futures = "0.3.1"
reqwest = { version = "0.10", features = ["json"] }
serde = { version = "1.0.92", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
chrono = { version = "0.4.9", optional = true }

[features]
use-serde = ["serde", "serde_json"]
chrono_timestamps = ["chrono"]
default = ["use-serde"]

[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
<a href="https://www.rust-lang.org/en-US/">
<img src="https://img.shields.io/badge/Made%20with-Rust-orange.svg" alt='Build with Rust' />
</a>
<a href="https://blog.rust-lang.org/2019/11/07/Rust-1.39.0.html">
<img src="https://img.shields.io/badge/rustc-1.39+-yellow.svg" alt='Minimum Rust Version' />
</a>
</p>

This library is a work in progress. Although we've been using it in production at [OpenVelo](https://openvelo.org/),
Expand Down Expand Up @@ -54,8 +57,6 @@ For an example with using Serde deserialization, please refer to [serde_integrat

```rust
use influxdb::{Client, Query, Timestamp};
use serde::Deserialize;
use tokio::runtime::current_thread::Runtime;

// Create a Client with URL `http://localhost:8086`
// and database name `test`
Expand All @@ -67,21 +68,15 @@ let client = Client::new("http://localhost:8086", "test");
let write_query = Query::write_query(Timestamp::Now, "weather")
.add_field("temperature", 82);

// Since this library is async by default, we're going to need a Runtime,
// which can asynchonously run our query.
// The [tokio](https://crates.io/crates/tokio) crate lets us easily create a new Runtime.
let mut rt = Runtime::new().expect("Unable to create a runtime");

// To actually submit the data to InfluxDB, the `block_on` method can be used to
// halt execution of our program until it has been completed.
let write_result = rt.block_on(client.query(&write_query));
// Submit the query to InfluxDB.
let write_result = client.query(&write_query).await;
assert!(write_result.is_ok(), "Write result was not okay");

// Reading data is as simple as writing. First we need to create a query
let read_query = Query::raw_read_query("SELECT * FROM weather");

// Again, we're blocking until the request is done
let read_result = rt.block_on(client.query(&read_query));
// submit the request and wait until it's done
let read_result = client.query(&read_query).await;

assert!(read_result.is_ok(), "Read result was not ok");

Expand Down
3 changes: 3 additions & 0 deletions README.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
<a href="https://www.rust-lang.org/en-US/">
<img src="https://img.shields.io/badge/Made%20with-Rust-orange.svg" alt='Build with Rust' />
</a>
<a href="https://blog.rust-lang.org/2019/11/07/Rust-1.39.0.html">
<img src="https://img.shields.io/badge/rustc-1.39+-yellow.svg" alt='Minimum Rust Version' />
</a>
</p>

{{readme}}
Expand Down
177 changes: 71 additions & 106 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
//! assert_eq!(client.database_name(), "test");
//! ```
use futures::{Future, Stream};
use reqwest::r#async::{Client as ReqwestClient, Decoder};
use reqwest::{StatusCode, Url};

use std::mem;
use futures::prelude::*;
use reqwest::{self, Client as ReqwestClient, StatusCode, Url};

use crate::query::QueryTypes;
use crate::Error;
Expand Down Expand Up @@ -130,29 +127,27 @@ impl Client {
/// Pings the InfluxDB Server
///
/// Returns a tuple of build type and version number
pub fn ping(&self) -> impl Future<Item = (String, String), Error = Error> + Send {
ReqwestClient::new()
.get(format!("{}/ping", self.url).as_str())
.send()
.map(|res| {
let build = res
.headers()
.get("X-Influxdb-Build")
.unwrap()
.to_str()
.unwrap();
let version = res
.headers()
.get("X-Influxdb-Version")
.unwrap()
.to_str()
.unwrap();

(String::from(build), String::from(version))
})
pub async fn ping(&self) -> Result<(String, String), Error> {
let res = reqwest::get(format!("{}/ping", self.url).as_str())
.await
.map_err(|err| Error::ProtocolError {
error: format!("{}", err),
})
})?;

let build = res
.headers()
.get("X-Influxdb-Build")
.unwrap()
.to_str()
.unwrap();
let version = res
.headers()
.get("X-Influxdb-Version")
.unwrap()
.to_str()
.unwrap();

Ok((build.to_owned(), version.to_owned()))
}

/// Sends a [`ReadQuery`](crate::ReadQuery) or [`WriteQuery`](crate::WriteQuery) to the InfluxDB Server.
Expand All @@ -165,56 +160,47 @@ impl Client {
///
/// # Examples
///
/// ```rust
/// ```rust,no_run
/// use influxdb::{Client, Query, Timestamp};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), failure::Error> {
/// let client = Client::new("http://localhost:8086", "test");
/// let _future = client.query(
/// &Query::write_query(Timestamp::Now, "weather")
/// .add_field("temperature", 82)
/// );
/// let query = Query::write_query(Timestamp::Now, "weather")
/// .add_field("temperature", 82);
/// let results = client.query(&query).await?;
/// # Ok(())
/// # }
/// ```
/// # Errors
///
/// If the function can not finish the query,
/// a [`Error`] variant will be returned.
///
/// [`Error`]: enum.Error.html
pub fn query<'q, Q>(&self, q: &'q Q) -> Box<dyn Future<Item = String, Error = Error> + Send>
pub async fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
where
Q: Query,
&'q Q: Into<QueryTypes<'q>>,
{
use futures::future;

let query = match q.build() {
Err(err) => {
let error = Error::InvalidQueryError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
Ok(query) => query,
};
let query = q.build().map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;

let basic_parameters: Vec<(String, String)> = self.into();

let client = match q.into() {
QueryTypes::Read(_) => {
let read_query = query.get();
let mut url = match Url::parse_with_params(
let mut url = Url::parse_with_params(
format!("{url}/query", url = self.database_url()).as_str(),
basic_parameters,
) {
Ok(url) => url,
Err(err) => {
let error = Error::UrlConstructionError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
};
url.query_pairs_mut().append_pair("q", &read_query.clone());
)
.map_err(|err| Error::UrlConstructionError {
error: format!("{}", err),
})?;

url.query_pairs_mut().append_pair("q", &read_query);

if read_query.contains("SELECT") || read_query.contains("SHOW") {
ReqwestClient::new().get(url)
Expand All @@ -223,65 +209,44 @@ impl Client {
}
}
QueryTypes::Write(write_query) => {
let mut url = match Url::parse_with_params(
let mut url = Url::parse_with_params(
format!("{url}/write", url = self.database_url()).as_str(),
basic_parameters,
) {
Ok(url) => url,
Err(err) => {
let error = Error::InvalidQueryError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
};
)
.map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;

url.query_pairs_mut()
.append_pair("precision", &write_query.get_precision());

ReqwestClient::new().post(url).body(query.get())
}
};
Box::new(
client
.send()
.map_err(|err| Error::ConnectionError { error: err })
.and_then(
|res| -> future::FutureResult<reqwest::r#async::Response, Error> {
match res.status() {
StatusCode::UNAUTHORIZED => {
futures::future::err(Error::AuthorizationError)
}
StatusCode::FORBIDDEN => {
futures::future::err(Error::AuthenticationError)
}
_ => futures::future::ok(res),
}
},
)
.and_then(|mut res| {
let body = mem::replace(res.body_mut(), Decoder::empty());
body.concat2().map_err(|err| Error::ProtocolError {
error: format!("{}", err),
})
})
.and_then(|body| {
if let Ok(utf8) = std::str::from_utf8(&body) {
let s = utf8.to_owned();

// todo: improve error parsing without serde
if s.contains("\"error\"") {
return futures::future::err(Error::DatabaseError {
error: format!("influxdb error: \"{}\"", s),
});
}

return futures::future::ok(s);
}

futures::future::err(Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})
}),
)

let res = client
.send()
.map_err(|err| Error::ConnectionError { error: err })
.await?;

match res.status() {
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
_ => {}
}

let s = res.text().await.map_err(|_| Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})?;

// todo: improve error parsing without serde
if s.contains("\"error\"") {
return Err(Error::DatabaseError {
error: format!("influxdb error: \"{}\"", s),
});
}

Ok(s)
}
}

Expand Down
Loading

0 comments on commit 1b0cae1

Please sign in to comment.