Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async await #45

Merged
merged 15 commits into from
Jan 4, 2020
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rust:
- stable
- beta
- nightly
- 1.39.0

matrix:
fast_finish: true
Expand All @@ -37,8 +38,8 @@ matrix:
- rust: stable
env: NAME='linting'
before_script:
- rustup component add rustfmt-preview
- rustup component add clippy-preview
- rustup component add rustfmt
- rustup component add clippy
script:
- cargo fmt --all -- --check
- cargo clippy --all-targets --all-features -- -D warnings
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Rewrite to `async` / `await`. Rust 1.39 is now the minimum required Rust version.

## [0.0.5] - 2019-08-16

This release removes the prefix `InfluxDb` of most types in this library and reexports the types under the `influxdb::` path. In most cases, you can directly use the types now: e.g. `influxdb::Client` vs `influxdb::client::InfluxDbClient`.
Expand Down
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ repository = "https://github.com/Empty2k12/influxdb-rust"
travis-ci = { repository = "Empty2k12/influxdb-rust", branch = "master" }

[dependencies]
reqwest = "0.9.17"
futures = "0.1.27"
tokio = "0.1.20"
chrono = { version = "0.4.9", optional = true }
failure = "0.1.5"
serde = { version = "1.0.92", optional = true }
futures = "0.3.1"
reqwest = { version = "0.10", features = ["json"] }
serde = { version = "1.0.92", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
chrono = { version = "0.4.9", optional = true }

[features]
use-serde = ["serde", "serde_json"]
chrono_timestamps = ["chrono"]
default = ["use-serde"]

[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
19 changes: 7 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
<a href="https://www.rust-lang.org/en-US/">
<img src="https://img.shields.io/badge/Made%20with-Rust-orange.svg" alt='Build with Rust' />
</a>
<a href="https://blog.rust-lang.org/2019/11/07/Rust-1.39.0.html">
<img src="https://img.shields.io/badge/rustc-1.39+-yellow.svg" alt='Minimum Rust Version' />
</a>
</p>

This library is a work in progress. Although we've been using it in production at [OpenVelo](https://openvelo.org/),
Expand Down Expand Up @@ -54,8 +57,6 @@ For an example with using Serde deserialization, please refer to [serde_integrat

```rust
use influxdb::{Client, Query, Timestamp};
use serde::Deserialize;
use tokio::runtime::current_thread::Runtime;

// Create a Client with URL `http://localhost:8086`
// and database name `test`
Expand All @@ -67,21 +68,15 @@ let client = Client::new("http://localhost:8086", "test");
let write_query = Query::write_query(Timestamp::Now, "weather")
.add_field("temperature", 82);

// Since this library is async by default, we're going to need a Runtime,
// which can asynchonously run our query.
// The [tokio](https://crates.io/crates/tokio) crate lets us easily create a new Runtime.
let mut rt = Runtime::new().expect("Unable to create a runtime");

// To actually submit the data to InfluxDB, the `block_on` method can be used to
// halt execution of our program until it has been completed.
let write_result = rt.block_on(client.query(&write_query));
// Submit the query to InfluxDB.
let write_result = client.query(&write_query).await;
assert!(write_result.is_ok(), "Write result was not okay");

// Reading data is as simple as writing. First we need to create a query
let read_query = Query::raw_read_query("SELECT * FROM weather");

// Again, we're blocking until the request is done
let read_result = rt.block_on(client.query(&read_query));
// submit the request and wait until it's done
let read_result = client.query(&read_query).await;

assert!(read_result.is_ok(), "Read result was not ok");

Expand Down
3 changes: 3 additions & 0 deletions README.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
<a href="https://www.rust-lang.org/en-US/">
<img src="https://img.shields.io/badge/Made%20with-Rust-orange.svg" alt='Build with Rust' />
</a>
<a href="https://blog.rust-lang.org/2019/11/07/Rust-1.39.0.html">
<img src="https://img.shields.io/badge/rustc-1.39+-yellow.svg" alt='Minimum Rust Version' />
</a>
</p>

{{readme}}
Expand Down
177 changes: 71 additions & 106 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
//! assert_eq!(client.database_name(), "test");
//! ```
use futures::{Future, Stream};
use reqwest::r#async::{Client as ReqwestClient, Decoder};
use reqwest::{StatusCode, Url};

use std::mem;
use futures::prelude::*;
use reqwest::{self, Client as ReqwestClient, StatusCode, Url};

use crate::query::QueryTypes;
use crate::Error;
Expand Down Expand Up @@ -130,29 +127,27 @@ impl Client {
/// Pings the InfluxDB Server
///
/// Returns a tuple of build type and version number
pub fn ping(&self) -> impl Future<Item = (String, String), Error = Error> + Send {
ReqwestClient::new()
.get(format!("{}/ping", self.url).as_str())
.send()
.map(|res| {
let build = res
.headers()
.get("X-Influxdb-Build")
.unwrap()
.to_str()
.unwrap();
let version = res
.headers()
.get("X-Influxdb-Version")
.unwrap()
.to_str()
.unwrap();

(String::from(build), String::from(version))
})
pub async fn ping(&self) -> Result<(String, String), Error> {
let res = reqwest::get(format!("{}/ping", self.url).as_str())
.await
.map_err(|err| Error::ProtocolError {
error: format!("{}", err),
})
})?;

let build = res
.headers()
.get("X-Influxdb-Build")
.unwrap()
.to_str()
.unwrap();
let version = res
.headers()
.get("X-Influxdb-Version")
.unwrap()
.to_str()
.unwrap();

Ok((build.to_owned(), version.to_owned()))
}

/// Sends a [`ReadQuery`](crate::ReadQuery) or [`WriteQuery`](crate::WriteQuery) to the InfluxDB Server.
Expand All @@ -165,56 +160,47 @@ impl Client {
///
/// # Examples
///
/// ```rust
/// ```rust,no_run
/// use influxdb::{Client, Query, Timestamp};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), failure::Error> {
/// let client = Client::new("http://localhost:8086", "test");
/// let _future = client.query(
/// &Query::write_query(Timestamp::Now, "weather")
/// .add_field("temperature", 82)
/// );
/// let query = Query::write_query(Timestamp::Now, "weather")
/// .add_field("temperature", 82);
/// let results = client.query(&query).await?;
/// # Ok(())
/// # }
/// ```
/// # Errors
///
/// If the function can not finish the query,
/// a [`Error`] variant will be returned.
///
/// [`Error`]: enum.Error.html
pub fn query<'q, Q>(&self, q: &'q Q) -> Box<dyn Future<Item = String, Error = Error> + Send>
pub async fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
where
Q: Query,
&'q Q: Into<QueryTypes<'q>>,
{
use futures::future;

let query = match q.build() {
Err(err) => {
let error = Error::InvalidQueryError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
Ok(query) => query,
};
let query = q.build().map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;

let basic_parameters: Vec<(String, String)> = self.into();

let client = match q.into() {
QueryTypes::Read(_) => {
let read_query = query.get();
let mut url = match Url::parse_with_params(
let mut url = Url::parse_with_params(
format!("{url}/query", url = self.database_url()).as_str(),
basic_parameters,
) {
Ok(url) => url,
Err(err) => {
let error = Error::UrlConstructionError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
};
url.query_pairs_mut().append_pair("q", &read_query.clone());
)
.map_err(|err| Error::UrlConstructionError {
error: format!("{}", err),
})?;

url.query_pairs_mut().append_pair("q", &read_query);

if read_query.contains("SELECT") || read_query.contains("SHOW") {
ReqwestClient::new().get(url)
Expand All @@ -223,65 +209,44 @@ impl Client {
}
}
QueryTypes::Write(write_query) => {
let mut url = match Url::parse_with_params(
let mut url = Url::parse_with_params(
format!("{url}/write", url = self.database_url()).as_str(),
basic_parameters,
) {
Ok(url) => url,
Err(err) => {
let error = Error::InvalidQueryError {
error: format!("{}", err),
};
return Box::new(future::err::<String, Error>(error));
}
};
)
.map_err(|err| Error::InvalidQueryError {
error: format!("{}", err),
})?;

url.query_pairs_mut()
.append_pair("precision", &write_query.get_precision());

ReqwestClient::new().post(url).body(query.get())
}
};
Box::new(
client
.send()
.map_err(|err| Error::ConnectionError { error: err })
.and_then(
|res| -> future::FutureResult<reqwest::r#async::Response, Error> {
match res.status() {
StatusCode::UNAUTHORIZED => {
futures::future::err(Error::AuthorizationError)
}
StatusCode::FORBIDDEN => {
futures::future::err(Error::AuthenticationError)
}
_ => futures::future::ok(res),
}
},
)
.and_then(|mut res| {
let body = mem::replace(res.body_mut(), Decoder::empty());
body.concat2().map_err(|err| Error::ProtocolError {
error: format!("{}", err),
})
})
.and_then(|body| {
if let Ok(utf8) = std::str::from_utf8(&body) {
let s = utf8.to_owned();

// todo: improve error parsing without serde
if s.contains("\"error\"") {
return futures::future::err(Error::DatabaseError {
error: format!("influxdb error: \"{}\"", s),
});
}

return futures::future::ok(s);
}

futures::future::err(Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})
}),
)

let res = client
.send()
.map_err(|err| Error::ConnectionError { error: err })
.await?;

match res.status() {
StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
_ => {}
}

let s = res.text().await.map_err(|_| Error::DeserializationError {
error: "response could not be converted to UTF-8".to_string(),
})?;

// todo: improve error parsing without serde
if s.contains("\"error\"") {
return Err(Error::DatabaseError {
error: format!("influxdb error: \"{}\"", s),
});
}

Ok(s)
}
}

Expand Down
Loading