Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Limit maximum DB size #736

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bincode = "1.3.3"
bottomless = { version = "0", path = "../bottomless", features = ["libsql_linked_statically"] }
bytemuck = { version = "1.13.0", features = ["derive"] }
bytes = { version = "1.2.1", features = ["serde"] }
bytesize = "1.2.0"
bytesize = { version = "1.2.0", features = ["serde"] }
clap = { version = "4.0.23", features = [ "derive", "env", "string" ] }
# console-subscriber = { version = "0.1.10", optional = true }
console-subscriber = { git = "https://github.com/tokio-rs/console.git", rev = "5a80b98", optional = true }
Expand Down
22 changes: 20 additions & 2 deletions sqld/src/connection/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::{fs, io};

use crate::error::Error;
use crate::Result;
use crate::{Result, LIBSQL_PAGE_SIZE};

#[derive(Debug)]
pub struct DatabaseConfigStore {
Expand All @@ -14,7 +14,7 @@ pub struct DatabaseConfigStore {
config: Mutex<Arc<DatabaseConfig>>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
#[serde(default)]
pub block_reads: bool,
Expand All @@ -23,6 +23,24 @@ pub struct DatabaseConfig {
/// The reason why operations are blocked. This will be included in [`Error::Blocked`].
#[serde(default)]
pub block_reason: Option<String>,
/// maximum db size (in pages)
#[serde(default = "default_max_size")]
pub max_db_pages: u64,
}

const fn default_max_size() -> u64 {
bytesize::ByteSize::pb(1000).as_u64() / LIBSQL_PAGE_SIZE
}

impl Default for DatabaseConfig {
fn default() -> Self {
Self {
block_reads: Default::default(),
block_writes: Default::default(),
block_reason: Default::default(),
max_db_pages: default_max_size(),
}
}
}

impl DatabaseConfigStore {
Expand Down
10 changes: 7 additions & 3 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ where
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<W>>,
) -> crate::Result<Self> {
let conn = tokio::task::spawn_blocking(move || {
Connection::new(
let max_db_size = config_store.get().max_db_pages;
let conn = tokio::task::spawn_blocking(move || -> crate::Result<_> {
let conn = Connection::new(
path.as_ref(),
extensions,
wal_hook,
Expand All @@ -206,7 +207,10 @@ where
builder_config,
current_frame_no_receiver,
state,
)
)?;
conn.conn
.pragma_update(None, "max_page_count", max_db_size)?;
Ok(conn)
})
.await
.unwrap()?;
Expand Down
53 changes: 37 additions & 16 deletions sqld/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use axum::Json;
use chrono::NaiveDateTime;
use futures::TryStreamExt;
use hyper::Body;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::sync::Arc;
use tokio_util::io::ReaderStream;
use url::Url;

use crate::connection::config::DatabaseConfig;
use crate::database::Database;
use crate::error::LoadDumpError;
use crate::hrana;
use crate::namespace::{DumpStream, MakeNamespace, NamespaceName, NamespaceStore, RestoreOption};
use crate::LIBSQL_PAGE_SIZE;

pub mod stats;

Expand Down Expand Up @@ -73,12 +73,21 @@ async fn handle_get_index() -> &'static str {
async fn handle_get_config<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
Path(namespace): Path<String>,
) -> crate::Result<Json<Arc<DatabaseConfig>>> {
) -> crate::Result<Json<HttpDatabaseConfig>> {
let store = app_state
.namespaces
.config_store(NamespaceName::from_string(namespace)?)
.await?;
Ok(Json(store.get()))
let config = store.get();
let max_db_size = bytesize::ByteSize::b(config.max_db_pages * LIBSQL_PAGE_SIZE);
let resp = HttpDatabaseConfig {
block_reads: config.block_reads,
block_writes: config.block_writes,
block_reason: config.block_reason.clone(),
max_db_size: Some(max_db_size),
};

Ok(Json(resp))
}

async fn handle_diagnostics<M: MakeNamespace>(
Expand Down Expand Up @@ -108,23 +117,20 @@ async fn handle_diagnostics<M: MakeNamespace>(
Ok(Json(diagnostics))
}

#[derive(Debug, Deserialize)]
struct BlockReq {
#[derive(Debug, Deserialize, Serialize)]
struct HttpDatabaseConfig {
block_reads: bool,
block_writes: bool,
#[serde(default)]
block_reason: Option<String>,
}

#[derive(Debug, Deserialize)]
struct CreateNamespaceReq {
dump_url: Option<Url>,
#[serde(default)]
max_db_size: Option<bytesize::ByteSize>,
}

async fn handle_post_config<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
Path(namespace): Path<String>,
Json(req): Json<BlockReq>,
Json(req): Json<HttpDatabaseConfig>,
) -> crate::Result<()> {
let store = app_state
.namespaces
Expand All @@ -134,12 +140,21 @@ async fn handle_post_config<M: MakeNamespace>(
config.block_reads = req.block_reads;
config.block_writes = req.block_writes;
config.block_reason = req.block_reason;
if let Some(size) = req.max_db_size {
config.max_db_pages = size.as_u64() / LIBSQL_PAGE_SIZE;
}

store.store(config)?;

Ok(())
}

#[derive(Debug, Deserialize)]
struct CreateNamespaceReq {
dump_url: Option<Url>,
max_db_size: Option<bytesize::ByteSize>,
}

async fn handle_create_namespace<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
Path(namespace): Path<String>,
Expand All @@ -150,10 +165,16 @@ async fn handle_create_namespace<M: MakeNamespace>(
None => RestoreOption::Latest,
};

app_state
.namespaces
.create(NamespaceName::from_string(namespace)?, dump)
.await?;
let namespace = NamespaceName::from_string(namespace)?;
app_state.namespaces.create(namespace.clone(), dump).await?;

if let Some(max_db_size) = req.max_db_size {
let store = app_state.namespaces.config_store(namespace).await?;
let mut config = (*store.get()).clone();
config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE;
store.store(config)?;
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod utils;
const MAX_CONCURRENT_DBS: usize = 128;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
const LIBSQL_PAGE_SIZE: u64 = 4096;

pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
Expand Down
7 changes: 2 additions & 5 deletions sqld/src/namespace/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ use crate::database::PrimaryDatabase;
use crate::replication::frame::Frame;
use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::{LogReadError, ReplicationLogger};
use crate::BLOCKING_RT;
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};

use super::{MakeNamespace, NamespaceName, ResetCb, RestoreOption};

// FIXME: get this const from somewhere else (crate wide)
const PAGE_SIZE: usize = 4096;

type Result<T> = crate::Result<T, ForkError>;

#[derive(Debug, thiserror::Error)]
Expand All @@ -46,7 +43,7 @@ impl From<tokio::task::JoinError> for ForkError {

async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
let page_no = frame.header().page_no;
let page_pos = (page_no - 1) as usize * PAGE_SIZE;
let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize;
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
temp_file.write_all(frame.page()).await?;

Expand Down
6 changes: 3 additions & 3 deletions sqld/src/replication/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::ops::Deref;
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
use bytes::{Bytes, BytesMut};

use crate::replication::WAL_PAGE_SIZE;
use crate::LIBSQL_PAGE_SIZE;

use super::FrameNo;

Expand Down Expand Up @@ -45,10 +45,10 @@ impl fmt::Debug for Frame {

impl Frame {
/// size of a single frame
pub const SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize;
pub const SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;

pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
assert_eq!(data.len(), WAL_PAGE_SIZE as usize);
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
let mut buf = BytesMut::with_capacity(Self::SIZE);
buf.extend_from_slice(bytes_of(header));
buf.extend_from_slice(data);
Expand Down
1 change: 0 additions & 1 deletion sqld/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crc::Crc;
pub use primary::logger::{LogReadError, ReplicationLogger, ReplicationLoggerHook};
pub use snapshot::{NamespacedSnapshotCallback, SnapshotCallback};

pub const WAL_PAGE_SIZE: i32 = 4096;
pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0");
const CRC_64_GO_ISO: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_GO_ISO);

Expand Down
19 changes: 10 additions & 9 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::libsql_bindings::ffi::{
use crate::libsql_bindings::wal_hook::WalHook;
use crate::replication::frame::{Frame, FrameHeader};
use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile};
use crate::replication::{FrameNo, SnapshotCallback, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE};
use crate::replication::{FrameNo, SnapshotCallback, CRC_64_GO_ISO, WAL_MAGIC};
use crate::LIBSQL_PAGE_SIZE;

init_static_wal_method!(REPLICATION_METHODS, ReplicationLoggerHook);

Expand Down Expand Up @@ -375,7 +376,7 @@ pub enum LogReadError {

impl LogFile {
/// size of a single frame
pub const FRAME_SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize;
pub const FRAME_SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;

pub fn new(
file: File,
Expand All @@ -392,7 +393,7 @@ impl LogFile {
version: 2,
start_frame_no: 0,
magic: WAL_MAGIC,
page_size: WAL_PAGE_SIZE,
page_size: LIBSQL_PAGE_SIZE as i32,
start_checksum: 0,
db_id: db_id.as_u128(),
frame_count: 0,
Expand Down Expand Up @@ -858,14 +859,14 @@ impl ReplicationLogger {
let data_file = File::open(&data_path)?;
let size = data_path.metadata()?.len();
assert!(
size % WAL_PAGE_SIZE as u64 == 0,
size % LIBSQL_PAGE_SIZE == 0,
"database file size is not a multiple of page size"
);
let num_page = size / WAL_PAGE_SIZE as u64;
let mut buf = [0; WAL_PAGE_SIZE as usize];
let num_page = size / LIBSQL_PAGE_SIZE;
let mut buf = [0; LIBSQL_PAGE_SIZE as usize];
let mut page_no = 1; // page numbering starts at 1
for i in 0..num_page {
data_file.read_exact_at(&mut buf, i * WAL_PAGE_SIZE as u64)?;
data_file.read_exact_at(&mut buf, i * LIBSQL_PAGE_SIZE)?;
log_file.push_page(&WalPage {
page_no,
size_after: if i == num_page - 1 { num_page as _ } else { 0 },
Expand Down Expand Up @@ -977,9 +978,9 @@ pub fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
conn.pragma_query(None, "page_size", |row| {
let page_size = row.get::<_, i32>(0).unwrap();
assert_eq!(
page_size, WAL_PAGE_SIZE,
page_size, LIBSQL_PAGE_SIZE as i32,
"invalid database file, expected page size to be {}, but found {} instead",
WAL_PAGE_SIZE, page_size
LIBSQL_PAGE_SIZE, page_size
);
Ok(())
})?;
Expand Down
5 changes: 3 additions & 2 deletions sqld/src/replication/replica/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use sqld_libsql_bindings::init_static_wal_method;
use sqld_libsql_bindings::{ffi::types::XWalFrameFn, wal_hook::WalHook};

use crate::replication::frame::{Frame, FrameBorrowed};
use crate::replication::{FrameNo, WAL_PAGE_SIZE};
use crate::replication::FrameNo;
use crate::LIBSQL_PAGE_SIZE;

use super::snapshot::TempSnapshot;

Expand Down Expand Up @@ -123,7 +124,7 @@ impl InjectorHookCtx {
let ret = unsafe {
orig(
wal,
WAL_PAGE_SIZE,
LIBSQL_PAGE_SIZE as i32,
page_headers.as_ptr(),
size_after,
(size_after != 0) as _,
Expand Down