Skip to content

Replace a few types with more developer-friendly names #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,16 @@ where

let mut res = BytesMut::new();
res.put(row_description(&columns));
for ((pool_name, username), pool) in get_all_pools() {
for (user_pool, pool) in get_all_pools() {
let def = HashMap::default();
let pool_stats = all_pool_stats
.get(&(pool_name.clone(), username.clone()))
.get(&(user_pool.db.clone(), user_pool.user.clone()))
.unwrap_or(&def);

let pool_config = &pool.settings;
let mut row = vec![
pool_name.clone(),
username.clone(),
user_pool.db.clone(),
user_pool.user.clone(),
pool_config.pool_mode.to_string(),
];
for column in &columns[3..columns.len()] {
Expand Down Expand Up @@ -420,7 +420,7 @@ where
let mut res = BytesMut::new();
res.put(row_description(&columns));

for ((db, username), pool) in get_all_pools() {
for (user_pool, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
Expand All @@ -429,7 +429,7 @@ where
None => HashMap::new(),
};

let mut row = vec![address.name(), db.clone(), username.clone()];
let mut row = vec![address.name(), user_pool.db.clone(), user_pool.user.clone()];
for column in &columns[3..] {
row.push(stats.get(column.0).unwrap_or(&0).to_string());
}
Expand Down
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ where
}
// Authenticate normal user.
else {
let pool = match get_pool(pool_name.clone(), username.clone()) {
let pool = match get_pool(&pool_name, &username) {
Some(pool) => pool,
None => {
error_response(
Expand Down Expand Up @@ -648,7 +648,7 @@ where
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let pool = match get_pool(self.pool_name.clone(), self.username.clone()) {
let pool = match get_pool(&self.pool_name, &self.username) {
Some(pool) => pool,
None => {
error_response(
Expand Down
47 changes: 38 additions & 9 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,43 @@ use crate::server::Server;
use crate::sharding::ShardingFunction;
use crate::stats::{get_reporter, Reporter};

pub type ProcessId = i32;
pub type SecretKey = i32;
pub type ServerHost = String;
pub type ServerPort = u16;

pub type BanList = Arc<RwLock<Vec<HashMap<Address, NaiveDateTime>>>>;
pub type ClientServerMap = Arc<Mutex<HashMap<(i32, i32), (i32, i32, String, u16)>>>;
pub type PoolMap = HashMap<(String, String), ConnectionPool>;
pub type ClientServerMap =
Arc<Mutex<HashMap<(ProcessId, SecretKey), (ProcessId, SecretKey, ServerHost, ServerPort)>>>;
pub type PoolMap = HashMap<PoolIdentifier, ConnectionPool>;
/// The connection pool, globally available.
/// This is atomic and safe and read-optimized.
/// The pool is recreated dynamically when the config is reloaded.
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
static POOLS_HASH: Lazy<ArcSwap<HashSet<crate::config::Pool>>> =
Lazy::new(|| ArcSwap::from_pointee(HashSet::default()));

/// An identifier for a PgCat pool,
/// a database visible to clients.
#[derive(Hash, Debug, Clone, PartialEq, Eq)]
pub struct PoolIdentifier {
// The name of the database clients want to connect to.
pub db: String,

/// The username the client connects with. Each user gets its own pool.
pub user: String,
}

impl PoolIdentifier {
/// Create a new user/pool identifier.
pub fn new(db: &str, user: &str) -> PoolIdentifier {
PoolIdentifier {
db: db.to_string(),
user: user.to_string(),
}
}
}

/// Pool settings.
#[derive(Clone, Debug)]
pub struct PoolSettings {
Expand Down Expand Up @@ -113,14 +140,16 @@ impl ConnectionPool {
// If the pool hasn't changed, get existing reference and insert it into the new_pools.
// We replace all pools at the end, but if the reference is kept, the pool won't get re-created (bb8).
if !changed {
match get_pool(pool_name.clone(), user.username.clone()) {
match get_pool(&pool_name, &user.username) {
Some(pool) => {
info!(
"[pool: {}][user: {}] has not changed",
pool_name, user.username
);
new_pools
.insert((pool_name.clone(), user.username.clone()), pool.clone());
new_pools.insert(
PoolIdentifier::new(&pool_name, &user.username),
pool.clone(),
);
continue;
}
None => (),
Expand Down Expand Up @@ -239,7 +268,7 @@ impl ConnectionPool {
};

// There is one pool per database/user pair.
new_pools.insert((pool_name.clone(), user.username.clone()), pool);
new_pools.insert(PoolIdentifier::new(&pool_name, &user.username), pool);
}
}

Expand Down Expand Up @@ -603,15 +632,15 @@ impl ManageConnection for ServerPool {
}

/// Get the connection pool
pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> {
match get_all_pools().get(&(db, user)) {
pub fn get_pool(db: &str, user: &str) -> Option<ConnectionPool> {
match get_all_pools().get(&PoolIdentifier::new(&db, &user)) {
Some(pool) => Some(pool.clone()),
None => None,
}
}

/// Get a pointer to all configured pools.
pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> {
pub fn get_all_pools() -> HashMap<PoolIdentifier, ConnectionPool> {
return (*(*POOLS.load())).clone();
}

Expand Down
6 changes: 3 additions & 3 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,11 @@ impl Collector {
tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15));
loop {
interval.tick().await;
for ((pool_name, username), _pool) in get_all_pools() {
for (user_pool, _) in get_all_pools() {
let _ = tx.try_send(Event {
name: EventName::UpdateStats {
pool_name,
username,
pool_name: user_pool.db,
username: user_pool.user,
},
value: 0,
});
Expand Down