Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

override default healthcheck support #35

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add `MySqlConnectionManager::with_custom_health_check()` constructor.

## 24.0.0

- Update `mysql` dependency to `24`.
Expand Down
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,22 @@ fn main() {
}
}
```

### Custom Health Check

If in case for some reason your server don't support `SELECT version()` you can override the default healthcheck function:

```rust
use std::{env, sync::Arc, thread};
use mysql::{prelude::*, Conn, Error, Opts, OptsBuilder};

fn health_check(_: MySqlConnectionManager, conn: &mut Conn) -> Result<(), Error> {
conn.query("SELECT 1").map(|_: Vec<String>| ())
}

fn main() {
// [ .. ]
let manager = MySqlConnectionManager::with_custom_health_check(builder, health_check);
// [ .. ]
}
```
56 changes: 55 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@
//! let _ = th.join();
//! }
//! ```
//!
//! # Custom Health Check
//! ```
//! # use r2d2_mysql::{
//! # mysql::{prelude::*, *},
//! # MySqlConnectionManager,
//! # };
//! fn health_check(
//! _: MySqlConnectionManager,
//! conn: &mut mysql::Conn
//! ) -> Result<(), mysql::Error> {
//! conn.query("SELECT 1").map(|_: Vec<String>| ())
//! }
//!
//! // ...
//!
//! # let url = std::env::var("DATABASE_URL").unwrap();
//! # let opts = mysql::Opts::from_url(&url).unwrap();
//! # let builder = OptsBuilder::from_opts(opts);
//! let manager = MySqlConnectionManager::with_custom_health_check(builder, health_check);
//! ```

pub use mysql;
pub use r2d2;
Expand All @@ -46,7 +67,7 @@ pub use self::pool::MySqlConnectionManager;
mod test {
use std::{env, sync::Arc, thread};

use mysql::{prelude::*, Opts, OptsBuilder};
use mysql::{prelude::*, Conn, Error, Opts, OptsBuilder};

use super::MySqlConnectionManager;

Expand Down Expand Up @@ -78,4 +99,37 @@ mod test {
let _ = th.join();
}
}

#[test]
fn query_pool_with_custom_health_check() {
fn health_check(_: MySqlConnectionManager, conn: &mut Conn) -> Result<(), Error> {
conn.query("SELECT 1").map(|_: Vec<String>| ())
}

let url = env::var("DATABASE_URL").unwrap();
let opts = Opts::from_url(&url).unwrap();
let builder = OptsBuilder::from_opts(opts);
let manager = MySqlConnectionManager::with_custom_health_check(builder, health_check);
let pool = Arc::new(r2d2::Pool::builder().max_size(4).build(manager).unwrap());

let mut tasks = vec![];

for _ in 0..3 {
let pool = pool.clone();
let th = thread::spawn(move || {
let mut conn = pool.get().expect("error getting connection from pool");

let _ = conn
.query("SELECT 1")
.map(|rows: Vec<String>| rows.is_empty())
.expect("error executing query");
});

tasks.push(th);
}

for th in tasks {
let _ = th.join();
}
}
}
41 changes: 41 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,43 @@
//!
//! See [`MySqlConnectionManager`].

use std::fmt;

use mysql::{error::Error, prelude::*, Conn, Opts, OptsBuilder};

/// A type for custom health check function.
type HealthCheckFn = fn(MySqlConnectionManager, &mut Conn) -> Result<(), Error>;

/// A wrapper for ease of applying custom health check function.
#[derive(Clone)]
struct HealthCheckFnWrapper {
function: HealthCheckFn,
}

/// An [`r2d2`] connection manager for [`mysql`] connections.
#[derive(Clone, Debug)]
pub struct MySqlConnectionManager {
params: Opts,
health_check_fn: Option<HealthCheckFnWrapper>,
}

impl MySqlConnectionManager {
/// Constructs a new MySQL connection manager from `params`.
pub fn new(params: OptsBuilder) -> MySqlConnectionManager {
MySqlConnectionManager {
params: Opts::from(params),
health_check_fn: None,
}
}

/// Constructs a new MySQL connection manager from `params` with custom health check function.
pub fn with_custom_health_check(
params: OptsBuilder,
health_check_fn: HealthCheckFn,
) -> MySqlConnectionManager {
MySqlConnectionManager {
params: Opts::from(params),
health_check_fn: Some(HealthCheckFnWrapper::new(health_check_fn)),
}
}
}
Expand All @@ -28,10 +52,27 @@ impl r2d2::ManageConnection for MySqlConnectionManager {
}

fn is_valid(&self, conn: &mut Conn) -> Result<(), Error> {
if let Some(health_check_fn) = self.health_check_fn.clone() {
return (health_check_fn.function)(self.clone(), conn);
}
conn.query("SELECT version()").map(|_: Vec<String>| ())
}

fn has_broken(&self, conn: &mut Conn) -> bool {
self.is_valid(conn).is_err()
}
}

impl HealthCheckFnWrapper {
/// Constructs a new health check function wrapper from `function`.
fn new(function: HealthCheckFn) -> Self {
Self { function }
}
}

impl fmt::Debug for HealthCheckFnWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HealthCheckFnWrapper")
.finish_non_exhaustive()
}
}
Loading