Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

turn off sqlite auto_checkpoint and introduce period-based checkpoint calls #547

Merged
merged 12 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
use uuid::Uuid;

#[derive(Debug)]
Expand Down Expand Up @@ -85,6 +86,7 @@ impl WalCopier {
tracing::trace!("Flushing {} frames locally.", frames.len());

for start in frames.clone().step_by(self.max_frames_per_batch) {
let period_start = Instant::now();
let timestamp = chrono::Utc::now().timestamp() as u64;
let end = (start + self.max_frames_per_batch as u32).min(frames.end);
let len = (end - start) as usize;
Expand Down Expand Up @@ -112,8 +114,9 @@ impl WalCopier {
}
}
if tracing::enabled!(tracing::Level::DEBUG) {
let elapsed = Instant::now() - period_start;
let file_len = out.metadata().await?.len();
tracing::debug!("written {} bytes to {}", file_len, fdesc);
tracing::debug!("written {} bytes to {} in {:?}", file_len, fdesc, elapsed);
}
drop(out);
if self.outbox.send(fdesc).await.is_err() {
Expand Down
3 changes: 3 additions & 0 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::ffi::{
bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal,
};
use std::ffi::{c_char, c_void};
use tokio::time::Instant;

// Just heuristics, but should work for ~100% of cases
fn is_regular(vfs: *const sqlite3_vfs) -> bool {
Expand Down Expand Up @@ -300,6 +301,7 @@ pub extern "C" fn xCheckpoint(
backfilled_frames: *mut i32,
) -> i32 {
tracing::trace!("Checkpoint");
let start = Instant::now();

/* In order to avoid partial checkpoints, passive checkpoint
** mode is not allowed. Only TRUNCATE checkpoints are accepted,
Expand Down Expand Up @@ -366,6 +368,7 @@ pub extern "C" fn xCheckpoint(
);
return ffi::SQLITE_IOERR_WRITE;
}
tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start);

ffi::SQLITE_OK
}
Expand Down
13 changes: 10 additions & 3 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl Default for Options {
Options {
create_bucket_if_not_exists: true,
verify_crc: true,
use_compression: CompressionKind::None,
use_compression: CompressionKind::Gzip,
Copy link
Collaborator

@haaawk haaawk Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this change related to auto checkpointing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, I've left it when started running comparisons with Litestream. Keeping non-compressed WAL frames around is extremely expensive and we definitely shouldn't do that by default.

max_batch_interval: Duration::from_secs(15),
max_frames_per_batch: 500, // basically half of the default SQLite checkpoint size
s3_upload_max_parallelism: 32,
Expand All @@ -213,6 +213,10 @@ impl Replicator {
}

pub async fn with_options<S: Into<String>>(db_path: S, options: Options) -> Result<Self> {
tracing::trace!(
"Starting bottomless replicator with options: {:#?}",
options
);
let config = options.client_config().await;
let client = Client::from_conf(config);
let bucket = options.bucket_name.clone();
Expand Down Expand Up @@ -306,6 +310,7 @@ impl Replicator {
let sem = Arc::new(tokio::sync::Semaphore::new(max_parallelism));
while let Some(fdesc) = frames_inbox.recv().await {
tracing::trace!("Received S3 upload request: {}", fdesc);
let start = Instant::now();
let sem = sem.clone();
let permit = sem.acquire_owned().await.unwrap();
let client = client.clone();
Expand All @@ -324,7 +329,8 @@ impl Replicator {
tracing::error!("Failed to send {} to S3: {}", fpath, e);
} else {
tokio::fs::remove_file(&fpath).await.unwrap();
tracing::trace!("Uploaded to S3: {}", fpath);
let elapsed = Instant::now() - start;
tracing::debug!("Uploaded to S3: {} in {:?}", fpath, elapsed);
}
drop(permit);
});
Expand Down Expand Up @@ -608,6 +614,7 @@ impl Replicator {
return Ok(false);
}
tracing::debug!("Snapshotting {}", self.db_path);
let start = Instant::now();
let change_counter = match self.use_compression {
CompressionKind::None => {
self.client
Expand Down Expand Up @@ -651,7 +658,7 @@ impl Replicator {
.body(ByteStream::from(Bytes::copy_from_slice(&change_counter)))
.send()
.await?;
tracing::debug!("Main db snapshot complete");
tracing::debug!("Main db snapshot complete in {:?}", Instant::now() - start);
Ok(true)
}

Expand Down
7 changes: 6 additions & 1 deletion sqld-libsql-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl<'a> Connection<'a> {
// it has been instanciated and lives for long enough
_wal_hook: &'static WalMethodsHook<W>,
hook_ctx: &'a mut W::Context,
auto_checkpoint: u32,
) -> Result<Self, rusqlite::Error> {
let path = path.as_ref().join("data");
tracing::trace!(
Expand All @@ -87,7 +88,7 @@ impl<'a> Connection<'a> {
// We pass a pointer to the WAL methods data to the database connection. This means
// that the reference must outlive the connection. This is guaranteed by the marker in
// the returned connection.
let rc = rusqlite::ffi::libsql_open_v2(
let mut rc = rusqlite::ffi::libsql_open_v2(
filename.as_ptr(),
&mut db as *mut _,
flags.bits(),
Expand All @@ -96,6 +97,10 @@ impl<'a> Connection<'a> {
hook_ctx as *mut _ as *mut _,
);

if rc == 0 {
rc = rusqlite::ffi::sqlite3_wal_autocheckpoint(db, auto_checkpoint as _);
}

if rc != 0 {
rusqlite::ffi::sqlite3_close(db);
return Err(rusqlite::Error::SqliteFailure(
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/connection/dump/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ impl DumpLoader {

let (ok_snd, ok_rcv) = oneshot::channel::<anyhow::Result<()>>();
tokio::task::spawn_blocking(move || {
let auto_checkpoint = logger.auto_checkpoint;
let mut ctx = ReplicationLoggerHookCtx::new(logger, bottomless_replicator);
let mut retries = 0;
let db = loop {
match open_db(&path, &REPLICATION_METHODS, &mut ctx, None) {
match open_db(&path, &REPLICATION_METHODS, &mut ctx, None, auto_checkpoint) {
Ok(db) => {
if ok_snd.send(Ok(())).is_ok() {
break db;
Expand Down
16 changes: 13 additions & 3 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct LibSqlDbFactory<W: WalHook + 'static> {
extensions: Vec<PathBuf>,
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
/// In wal mode, closing the last database takes time, and causes other databases creation to
/// return sqlite busy. To mitigate that, we hold on to one connection
_db: Option<LibSqlConnection>,
Expand All @@ -53,6 +54,7 @@ where
extensions: Vec<PathBuf>,
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
) -> Result<Self>
where
F: Fn() -> W::Context + Sync + Send + 'static,
Expand All @@ -66,6 +68,7 @@ where
extensions,
max_response_size,
max_total_response_size,
auto_checkpoint,
_db: None,
};

Expand Down Expand Up @@ -115,6 +118,7 @@ where
QueryBuilderConfig {
max_size: Some(self.max_response_size),
max_total_size: Some(self.max_total_response_size),
auto_checkpoint: self.auto_checkpoint,
},
)
.await
Expand Down Expand Up @@ -144,6 +148,7 @@ pub fn open_db<'a, W>(
wal_methods: &'static WalMethodsHook<W>,
hook_ctx: &'a mut W::Context,
flags: Option<OpenFlags>,
auto_checkpoint: u32,
) -> Result<sqld_libsql_bindings::Connection<'a>, rusqlite::Error>
where
W: WalHook,
Expand All @@ -154,8 +159,7 @@ where
| OpenFlags::SQLITE_OPEN_URI
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
);

sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx)
sqld_libsql_bindings::Connection::open(path, flags, wal_methods, hook_ctx, auto_checkpoint)
}

impl LibSqlConnection {
Expand Down Expand Up @@ -254,7 +258,13 @@ impl<'a> Connection<'a> {
builder_config: QueryBuilderConfig,
) -> Result<Self> {
let this = Self {
conn: open_db(path, wal_methods, hook_ctx, None)?,
conn: open_db(
path,
wal_methods,
hook_ctx,
None,
builder_config.auto_checkpoint,
)?,
timeout_deadline: None,
timed_out: false,
stats,
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::rpc::proxy::rpc::proxy_client::ProxyClient;
use crate::rpc::proxy::rpc::query_result::RowResult;
use crate::rpc::proxy::rpc::{DisconnectMessage, ExecuteResults};
use crate::stats::Stats;
use crate::Result;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::config::DatabaseConfigStore;
use super::libsql::LibSqlConnection;
Expand Down Expand Up @@ -85,6 +85,7 @@ impl MakeConnection for MakeWriteProxyConnection {
QueryBuilderConfig {
max_size: Some(self.max_response_size),
max_total_size: Some(self.max_total_response_size),
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
},
self.namespace.clone(),
)
Expand Down
66 changes: 65 additions & 1 deletion sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::error::Error;
use crate::stats::Stats;

use sha256::try_digest;
use tokio::time::{interval, sleep, Instant, MissedTickBehavior};

pub use sqld_libsql_bindings as libsql;

Expand All @@ -61,6 +62,7 @@ pub mod version;
const MAX_CONCURRENT_DBS: usize = 128;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_NAMESPACE_NAME: &str = "default";
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
pub enum Backend {
Expand Down Expand Up @@ -107,6 +109,7 @@ pub struct Config {
pub snapshot_exec: Option<String>,
pub disable_default_namespace: bool,
pub disable_namespaces: bool,
pub checkpoint_interval: Option<Duration>,
}

impl Default for Config {
Expand Down Expand Up @@ -148,6 +151,7 @@ impl Default for Config {
snapshot_exec: None,
disable_default_namespace: false,
disable_namespaces: true,
checkpoint_interval: None,
}
}
}
Expand Down Expand Up @@ -462,6 +466,7 @@ async fn start_primary(
max_response_size: config.max_response_size,
load_from_dump: None,
max_total_response_size: config.max_total_response_size,
checkpoint_interval: config.checkpoint_interval,
};
let factory = PrimaryNamespaceMaker::new(conf);
let namespaces = Arc::new(NamespaceStore::new(factory));
Expand Down Expand Up @@ -527,7 +532,9 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<(
// initialize a connection here, and keep it alive for the entirety of the program. If we
// fail to open it, we wait for `duration` and try again later.
let ctx = &mut ();
let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY)) {
// We can safely open db with DEFAULT_AUTO_CHECKPOINT, since monitor is read-only: it
// won't produce new updates, frames or generate checkpoints.
let maybe_conn = match open_db(&db_path, &TRANSPARENT_METHODS, ctx, Some(rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY), DEFAULT_AUTO_CHECKPOINT) {
Ok(conn) => Some(conn),
Err(e) => {
tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}");
Expand Down Expand Up @@ -563,6 +570,57 @@ async fn run_storage_monitor(db_path: PathBuf, stats: Stats) -> anyhow::Result<(
Ok(())
}

async fn run_checkpoint_cron(db_path: PathBuf, period: Duration) -> anyhow::Result<()> {
const RETRY_INTERVAL: Duration = Duration::from_secs(60);
let data_path = db_path.join("data");
tracing::info!("setting checkpoint interval to {:?}", period);
let mut interval = interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut retry: Option<Duration> = None;
loop {
if let Some(retry) = retry.take() {
if retry.is_zero() {
tracing::warn!("database was not set in WAL journal mode");
return Ok(());
}
sleep(retry).await;
} else {
interval.tick().await;
}
let data_path = data_path.clone();
retry = tokio::task::spawn_blocking(move || match rusqlite::Connection::open(&data_path) {
Ok(conn) => unsafe {
let start = Instant::now();
let mut num_checkpointed: std::ffi::c_int = 0;
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
conn.handle(),
std::ptr::null(),
libsql::ffi::SQLITE_CHECKPOINT_TRUNCATE,
&mut num_checkpointed as *mut _,
std::ptr::null_mut(),
);
if rc == 0 {
if num_checkpointed == -1 {
return Some(Duration::default());
} else {
let elapsed = Instant::now() - start;
tracing::info!("database checkpoint (took: {:?})", elapsed);
}
None
} else {
tracing::warn!("failed to execute checkpoint - error code: {}", rc);
Some(RETRY_INTERVAL)
}
},
Err(err) => {
tracing::warn!("couldn't connect to '{:?}': {}", data_path, err);
Some(RETRY_INTERVAL)
}
})
.await?;
}
}

fn sentinel_file_path(path: &Path) -> PathBuf {
path.join(".sentinel")
}
Expand Down Expand Up @@ -685,6 +743,12 @@ pub async fn run_server(config: Config) -> anyhow::Result<()> {
join_set.spawn(run_storage_monitor(config.db_path.clone(), stats));
}

if let Some(interval) = config.checkpoint_interval {
if config.bottomless_replication.is_some() {
join_set.spawn(run_checkpoint_cron(config.db_path.clone(), interval));
}
}

loop {
tokio::select! {
_ = shutdown_receiver.recv() => {
Expand Down
6 changes: 6 additions & 0 deletions sqld/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ struct Cli {
#[clap(long, env = "SQLD_SNAPSHOT_EXEC")]
snapshot_exec: Option<String>,

/// Interval in seconds, in which WAL checkpoint is being called.
/// By default, the interval is 1 hour.
#[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S")]
checkpoint_interval_s: Option<u64>,

/// By default, all request for which a namespace can't be determined fallaback to the default
/// namespace `default`. This flag disables that.
#[clap(long)]
Expand Down Expand Up @@ -304,6 +309,7 @@ fn config_from_args(args: Cli) -> Result<Config> {
snapshot_exec: args.snapshot_exec,
disable_default_namespace: args.disable_default_namespace,
disable_namespaces: !args.enable_namespaces,
checkpoint_interval: args.checkpoint_interval_s.map(Duration::from_secs),
})
}

Expand Down
Loading