Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 2ece58d

Browse files
committed
wip manager
1 parent 09f7f09 commit 2ece58d

File tree

15 files changed

+553
-76
lines changed

15 files changed

+553
-76
lines changed

Cargo.lock

Lines changed: 256 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ color-eyre = "0.6.2"
1212
futures = "0.3.28"
1313
hyper = { version = "0.14.27", features = ["h2"] }
1414
libsqlx = { version = "0.1.0", path = "../libsqlx" }
15+
moka = { version = "0.11.2", features = ["future"] }
1516
serde = { version = "1.0.166", features = ["derive"] }
1617
tokio = { version = "1.29.1", features = ["full"] }
1718
tracing = "0.1.37"
Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
1-
use serde::{Serialize, Deserialize};
1+
use serde::{Deserialize, Serialize};
22

3+
/// Structural supertype of AllocConfig, used for checking the meta version. Subsequent version of
4+
/// AllocConfig need to conform to this prototype.
35
#[derive(Debug, Serialize, Deserialize)]
4-
pub enum AllocConfig {
6+
struct ConfigVersion {
7+
config_version: u32,
8+
}
9+
10+
#[derive(Debug, Serialize, Deserialize)]
11+
pub struct AllocConfig {
12+
pub config_version: u32,
13+
pub max_conccurent_connection: u32,
14+
pub id: String,
15+
pub db_config: DbConfig,
16+
}
17+
18+
#[derive(Debug, Serialize, Deserialize)]
19+
pub enum DbConfig {
520
Primary { },
6-
Replica {
7-
primary_node_id: String,
8-
}
21+
Replica { primary_node_id: String },
922
}

libsqlx-server/src/allocation/mod.rs

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
use std::collections::HashMap;
2+
use std::path::PathBuf;
23

3-
use tokio::{sync::{mpsc, oneshot}, task::{JoinSet, block_in_place}};
4+
use libsqlx::Database as _;
5+
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType};
6+
use tokio::task::{block_in_place, JoinSet};
7+
use tokio::sync::{mpsc, oneshot};
8+
9+
use self::config::{AllocConfig, DbConfig};
410

511
pub mod config;
612

713
type ExecFn = Box<dyn FnOnce(&mut dyn libsqlx::Connection) + Send>;
814

915
#[derive(Clone)]
10-
struct ConnectionId {
16+
pub struct ConnectionId {
1117
id: u32,
1218
close_sender: mpsc::Sender<()>,
1319
}
1420

15-
enum AllocationMessage {
21+
pub enum AllocationMessage {
1622
/// Execute callback against connection
1723
Exec {
1824
connection_id: ConnectionId,
@@ -22,30 +28,61 @@ enum AllocationMessage {
2228
NewConnExec {
2329
exec: ExecFn,
2430
ret: oneshot::Sender<ConnectionId>,
25-
}
31+
},
32+
}
33+
34+
pub enum Database {
35+
Primary(libsqlx::libsql::LibsqlDatabase<PrimaryType>),
2636
}
2737

28-
enum Database {}
38+
struct Compactor;
39+
40+
impl LogCompactor for Compactor {
41+
fn should_compact(&self, _log: &LogFile) -> bool {
42+
false
43+
}
44+
45+
fn compact(
46+
&self,
47+
_log: LogFile,
48+
_path: std::path::PathBuf,
49+
_size_after: u32,
50+
) -> Result<(), Box<dyn std::error::Error + Sync + Send + 'static>> {
51+
todo!()
52+
}
53+
}
2954

3055
impl Database {
56+
pub fn from_config(config: &AllocConfig, path: PathBuf) -> Self {
57+
match config.db_config {
58+
DbConfig::Primary {} => {
59+
let db = LibsqlDatabase::new_primary(path, Compactor, false).unwrap();
60+
Self::Primary(db)
61+
}
62+
DbConfig::Replica { .. } => todo!(),
63+
}
64+
}
65+
3166
fn connect(&self) -> Box<dyn libsqlx::Connection + Send> {
32-
todo!();
67+
match self {
68+
Database::Primary(db) => Box::new(db.connect().unwrap()),
69+
}
3370
}
3471
}
3572

3673
pub struct Allocation {
37-
inbox: mpsc::Receiver<AllocationMessage>,
38-
database: Database,
74+
pub inbox: mpsc::Receiver<AllocationMessage>,
75+
pub database: Database,
3976
/// senders to the spawned connections
40-
connections: HashMap<u32, mpsc::Sender<ExecFn>>,
77+
pub connections: HashMap<u32, mpsc::Sender<ExecFn>>,
4178
/// spawned connection futures, returning their connection id on completion.
42-
connections_futs: JoinSet<u32>,
43-
next_conn_id: u32,
44-
max_concurrent_connections: u32,
79+
pub connections_futs: JoinSet<u32>,
80+
pub next_conn_id: u32,
81+
pub max_concurrent_connections: u32,
4582
}
4683

4784
impl Allocation {
48-
async fn run(mut self) {
85+
pub async fn run(mut self) {
4986
loop {
5087
tokio::select! {
5188
Some(msg) = self.inbox.recv() => {
@@ -86,23 +123,19 @@ impl Allocation {
86123
exec: exec_receiver,
87124
};
88125

89-
90126
self.connections_futs.spawn(conn.run());
91127
// This should never block!
92128
assert!(exec_sender.try_send(exec).is_ok());
93129
assert!(self.connections.insert(id, exec_sender).is_none());
94130

95-
ConnectionId {
96-
id,
97-
close_sender,
98-
}
131+
ConnectionId { id, close_sender }
99132
}
100133

101134
fn next_conn_id(&mut self) -> u32 {
102135
loop {
103136
self.next_conn_id = self.next_conn_id.wrapping_add(1);
104137
if !self.connections.contains_key(&self.next_conn_id) {
105-
return self.next_conn_id
138+
return self.next_conn_id;
106139
}
107140
}
108141
}

libsqlx-server/src/databases/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
use uuid::Uuid;
2+
3+
mod store;
4+
5+
pub type DatabaseId = Uuid;

libsqlx-server/src/databases/store.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use std::collections::HashMap;
2+
3+
use super::DatabaseId;
4+
5+
pub enum Database {
6+
Replica,
7+
Primary,
8+
}
9+
10+
pub struct DatabaseManager {
11+
databases: HashMap<DatabaseId, Database>,
12+
}

libsqlx-server/src/http/admin.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use hyper::server::accept::Accept;
66
use serde::{Deserialize, Serialize};
77
use tokio::io::{AsyncRead, AsyncWrite};
88

9-
use crate::{meta::MetaStore, allocation::config::AllocConfig};
9+
use crate::{allocation::config::AllocConfig, meta::MetaStore};
1010

1111
pub struct AdminServerConfig {}
1212

@@ -22,6 +22,7 @@ where
2222
let state = AdminServerState {
2323
meta_store: todo!(),
2424
};
25+
2526
let app = Router::new()
2627
.route("/manage/allocation/create", post(allocate))
2728
.with_state(Arc::new(state));
@@ -36,7 +37,7 @@ where
3637
struct ErrorResponse {}
3738

3839
#[derive(Serialize, Debug)]
39-
struct AllocateResp { }
40+
struct AllocateResp {}
4041

4142
#[derive(Deserialize, Debug)]
4243
struct AllocateReq {

libsqlx-server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use tracing_subscriber::prelude::*;
55
mod allocation;
66
mod databases;
77
mod http;
8+
mod manager;
89
mod meta;
910

1011
#[tokio::main]

libsqlx-server/src/manager.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::collections::HashMap;
2+
use std::path::PathBuf;
3+
use std::sync::Arc;
4+
5+
use moka::future::Cache;
6+
use tokio::sync::mpsc;
7+
use tokio::task::JoinSet;
8+
9+
use crate::allocation::{Allocation, AllocationMessage, Database};
10+
use crate::meta::MetaStore;
11+
12+
pub struct Manager {
13+
cache: Cache<String, mpsc::Sender<AllocationMessage>>,
14+
meta_store: Arc<MetaStore>,
15+
db_path: PathBuf,
16+
}
17+
18+
const MAX_ALLOC_MESSAGE_QUEUE_LEN: usize = 32;
19+
20+
impl Manager {
21+
pub async fn alloc(&self, alloc_id: &str) -> mpsc::Sender<AllocationMessage> {
22+
if let Some(sender) = self.cache.get(alloc_id) {
23+
return sender.clone();
24+
}
25+
26+
if let Some(config) = self.meta_store.meta(alloc_id).await {
27+
let path = self.db_path.join("dbs").join(alloc_id);
28+
tokio::fs::create_dir_all(&path).await.unwrap();
29+
let (alloc_sender, inbox) = mpsc::channel(MAX_ALLOC_MESSAGE_QUEUE_LEN);
30+
let alloc = Allocation {
31+
inbox,
32+
database: Database::from_config(&config, path),
33+
connections: HashMap::new(),
34+
connections_futs: JoinSet::new(),
35+
next_conn_id: 0,
36+
max_concurrent_connections: config.max_conccurent_connection,
37+
};
38+
39+
tokio::spawn(alloc.run());
40+
41+
self.cache.insert(alloc_id.to_string(), alloc_sender.clone()).await;
42+
43+
return alloc_sender;
44+
}
45+
46+
todo!("alloc doesn't exist")
47+
}
48+
}

libsqlx-server/src/meta.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,38 @@
1+
use std::path::Path;
2+
3+
use libsqlx::{libsql::LibsqlDatabase, Database as _, Connection, Program};
14
use uuid::Uuid;
5+
use tokio::sync::mpsc;
26

37
use crate::allocation::config::AllocConfig;
48

5-
pub struct MetaStore {}
9+
type ExecFn = Box<dyn FnOnce(&mut libsqlx::libsql::LibsqlConnection<()>)>;
10+
11+
pub struct MetaStore {
12+
sender: mpsc::Sender<ExecFn>,
13+
}
614

715
impl MetaStore {
8-
pub async fn allocate(&self, alloc_id: &str, meta: &AllocConfig) {}
16+
pub fn new(path: &Path) -> Self {
17+
let path = path.join("meta");
18+
let (sender, receiver) = mpsc::channel(64);
19+
tokio::task::spawn_blocking(move || {
20+
let db = LibsqlDatabase::new_plain(path).unwrap();
21+
let conn = db.connect().unwrap();
22+
conn.execute("create table if not exist allocations (id TEXT, meta BLOB)");
23+
});
24+
25+
Self { sender }
26+
27+
}
28+
pub async fn allocate(&self, alloc_id: &str, meta: &AllocConfig) {
29+
self.sender.send(Box::new(|conn| {
30+
conn.execute_program(Program::seq(&["INSERT INTO tttt u "]), result_builder)
31+
32+
}));
33+
}
934
pub async fn deallocate(&self, alloc_id: Uuid) {}
35+
pub async fn meta(&self, alloc_id: &str) -> Option<AllocConfig> {
36+
todo!()
37+
}
1038
}

libsqlx/src/connection.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use crate::program::Program;
1+
use rusqlite::types::Value;
2+
3+
use crate::QueryBuilderConfig;
4+
use crate::program::{Program, Step};
5+
use crate::query::Query;
26
use crate::result_builder::ResultBuilder;
37

48
#[derive(Debug, Clone)]
@@ -30,6 +34,52 @@ pub trait Connection {
3034

3135
/// Parse the SQL statement and return information about it.
3236
fn describe(&self, sql: String) -> crate::Result<DescribeResponse>;
37+
38+
/// execute a single query
39+
fn execute(&mut self, query: Query) -> crate::Result<Vec<Vec<Value>>> {
40+
#[derive(Default)]
41+
struct RowsBuilder {
42+
error: Option<crate::error::Error>,
43+
rows: Vec<Vec<Value>>,
44+
current_row: Vec<Value>,
45+
}
46+
47+
impl ResultBuilder for RowsBuilder {
48+
fn init(&mut self, _config: &QueryBuilderConfig) -> std::result::Result<(), crate::QueryResultBuilderError> {
49+
self.error = None;
50+
self.rows.clear();
51+
self.current_row.clear();
52+
53+
Ok(())
54+
}
55+
56+
fn add_row_value(&mut self, v: rusqlite::types::ValueRef) -> Result<(), crate::QueryResultBuilderError> {
57+
self.current_row.push(v.into());
58+
Ok(())
59+
}
60+
61+
fn finish_row(&mut self) -> Result<(), crate::QueryResultBuilderError> {
62+
let row = std::mem::take(&mut self.current_row);
63+
self.rows.push(row);
64+
65+
Ok(())
66+
}
67+
68+
fn step_error(&mut self, error: crate::error::Error) -> Result<(), crate::QueryResultBuilderError> {
69+
self.error.replace(error);
70+
Ok(())
71+
}
72+
}
73+
74+
let pgm = Program::new(vec![Step { cond: None, query }]);
75+
let mut builder = RowsBuilder::default();
76+
self.execute_program(pgm, &mut builder)?;
77+
if let Some(err) = builder.error.take() {
78+
Err(err)
79+
} else {
80+
Ok(builder.rows)
81+
}
82+
}
3383
}
3484

3585
impl Connection for Box<dyn Connection> {

libsqlx/src/database/libsql/connection.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,11 @@ fn eval_cond(cond: &Cond, results: &[bool]) -> Result<bool> {
237237
}
238238

239239
impl<C> Connection for LibsqlConnection<C> {
240-
fn execute_program(&mut self, pgm: Program, builder: &mut dyn ResultBuilder) -> crate::Result<()> {
240+
fn execute_program(
241+
&mut self,
242+
pgm: Program,
243+
builder: &mut dyn ResultBuilder,
244+
) -> crate::Result<()> {
241245
self.run(pgm, builder)
242246
}
243247

0 commit comments

Comments
 (0)