forked from mocsy/actix_db
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlib.rs
109 lines (98 loc) · 3.13 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use diesel::prelude::*;
use diesel::r2d2;
use diesel::r2d2::{ ConnectionManager, Pool };
use diesel::result::Error;
use diesel::query_builder::SelectQuery;
use ::actix::prelude::*;
use diesel::query_dsl::LoadQuery;
use std::marker::PhantomData;
use std::env;
use dotenv::dotenv;
use log::debug;
pub struct DbExecutor<T: 'static> where T: Connection {
pub pool: Pool<ConnectionManager<T>>
}
impl<T: Connection> Actor for DbExecutor<T> {
type Context = SyncContext<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
debug!("I am alive!");
}
}
pub struct SQuery<S,I> {
pub select : S,
pub phantom: PhantomData<I>,
}
impl<S,I:'static> Message for SQuery<S,I>{
type Result = Result<Vec<I>, Error>;
}
impl<S: LoadQuery<PgConnection, I>+SelectQuery,I:'static> Handler<SQuery<S,I>> for DbExecutor<PgConnection> {
type Result = Result<Vec<I>, Error>;
fn handle(&mut self, msg: SQuery<S,I>, _: &mut Self::Context) -> Self::Result {
let pool = &self.pool;
debug!("handle Queries");
if let Ok(conn) = pool.get() {
let res = msg.select.load::<I>(&conn);
return res;
}
Ok(Vec::new())
}
}
pub struct WQuery<W,I> {
pub query : W,
pub phantom: PhantomData<I>,
}
impl<W,I:'static> Message for WQuery<W,I>{
type Result = Result<Vec<I>, Error>;
}
impl<W: LoadQuery<PgConnection, I>,I:'static> Handler<WQuery<W,I>> for DbExecutor<PgConnection> {
type Result = Result<Vec<I>, Error>;
fn handle(&mut self, msg: WQuery<W,I>, _: &mut Self::Context) -> Self::Result {
let pool = &self.pool;
debug!("handle Queries");
if let Ok(conn) = pool.get() {
let res = msg.query.get_results::<I>(&conn);
return res;
}
Ok(Vec::new())
}
}
pub struct AppState {
pub rdb: Addr<DbExecutor<diesel::PgConnection>>,
pub wdb: Addr<DbExecutor<diesel::PgConnection>>,
}
/// Use the Read setting with a connection String to access a read-only db replica
/// Use the Write setting to udpate with a connection String to a writeable DB
pub enum ConnectionType {
Read,
Write,
}
pub fn db_setup(conn_type: ConnectionType) -> actix::Addr<DbExecutor<diesel::PgConnection>> {
dotenv().ok();
let var = match conn_type {
ConnectionType::Read => "DB_READ_URL",
ConnectionType::Write => "DB_WRITE_URL",
};
let database_url = env::var(var)
.unwrap_or_else(|_| panic!("{} must be set", var));
let manager = ConnectionManager::<PgConnection>::new(database_url);
let pool = r2d2::Pool::builder()
.build(manager)
.expect("Failed to create pool.");
SyncArbiter::start(3, move || DbExecutor{pool:pool.clone()})
}
#[derive(Debug)]
pub enum DbExecutorError {
DatabaseError(diesel::result::Error),
MailBoxError(actix::MailboxError),
Unknown,
}
impl From<diesel::result::Error> for DbExecutorError {
fn from(error: diesel::result::Error) -> Self {
DbExecutorError::DatabaseError(error)
}
}
impl From<actix::MailboxError> for DbExecutorError {
fn from(error: actix::MailboxError) -> Self {
DbExecutorError::MailBoxError(error)
}
}