Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

backbone replication #379

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 160 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod replicator;
use crate::ffi::{
bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal,
};
use std::ffi::{c_char, c_void};
use std::ffi::{c_char, c_int, c_void};

// Just heuristics, but should work for ~100% of cases
fn is_regular(vfs: *const sqlite3_vfs) -> bool {
Expand Down Expand Up @@ -238,6 +238,8 @@ pub extern "C" fn xFrames(
size_after: u32,
is_commit: i32,
sync_flags: i32,
_precommit_cb: Option<unsafe extern "C" fn(ctx: *mut c_void) -> c_int>,
_precommit_ctx: *mut c_void,
) -> i32 {
let mut last_consistent_frame = 0;
if !is_local() {
Expand Down Expand Up @@ -282,6 +284,8 @@ pub extern "C" fn xFrames(
size_after,
is_commit,
sync_flags,
None,
std::ptr::null_mut(),
)
};
if is_local() || rc != ffi::SQLITE_OK {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
profile = "default"
channel = "1.67.0"
channel = "1.69.0"
10 changes: 8 additions & 2 deletions sqld-libsql-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ edition = "2021"
anyhow = "1.0.66"
mvfs = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
mwal = { git = "https://github.com/psarna/mvsqlite", branch = "mwal", optional = true }
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
# rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
# "buildtime_bindgen",
# "bundled-libsql-wasm-experimental",
# "column_decltype"
# ] }
rusqlite = { version = "0.29.0", path = "../../rusqlite", default-features = false, features = [
"buildtime_bindgen",
"bundled-libsql-wasm-experimental",
"column_decltype"
"column_decltype",
"load_extension"
] }
tracing = "0.1.37"

Expand Down
2 changes: 2 additions & 0 deletions sqld-libsql-bindings/src/ffi/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub type XWalFrameFn = unsafe extern "C" fn(
size_after: u32,
is_commit: c_int,
sync_flags: c_int,
precommit_cb: Option<unsafe extern "C" fn(ctx: *mut c_void) -> c_int>,
precommit_ctx: *mut c_void,
) -> c_int;
pub type XWalUndoFn = unsafe extern "C" fn(
wal: *mut Wal,
Expand Down
4 changes: 4 additions & 0 deletions sqld-libsql-bindings/src/wal_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub unsafe trait WalHook {
size_after,
is_commit,
sync_flags,
None,
std::ptr::null_mut(),
)
}
}
Expand Down Expand Up @@ -230,6 +232,8 @@ pub extern "C" fn xFrames(
size_after: u32,
is_commit: c_int,
sync_flags: c_int,
_precommit_cb: Option<unsafe extern "C" fn(ctx: *mut c_void) -> c_int>,
_precommit_ctx: *mut c_void,
) -> c_int {
let orig_methods = unsafe { get_orig_methods(wal) };
let methods = unsafe { get_methods(wal) };
Expand Down
9 changes: 8 additions & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ pin-project-lite = "0.2.9"
postgres-protocol = "0.6.4"
prost = "0.11.3"
regex = "1.7.0"
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
# rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "a6332e530f30dc2d47110", default-features = false, features = [
# "buildtime_bindgen",
# "bundled-libsql-wasm-experimental",
# "column_decltype",
# "load_extension"
# ] }
rusqlite = { version = "0.29.0", path = "../../rusqlite", default-features = false, features = [
"buildtime_bindgen",
"bundled-libsql-wasm-experimental",
"column_decltype",
Expand All @@ -59,6 +65,7 @@ tempfile = "3.3.0"
memmap = "0.7.0"
mimalloc = "0.1.36"
sha256 = "1.1.3"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", rev = "611aeea" }

[dev-dependencies]
proptest = "1.0.0"
Expand Down
61 changes: 61 additions & 0 deletions sqld/src/database/backbone/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message};

use crate::database::backbone::MetaMessage;

use super::{BackboneDatabase, Role};

pub struct InitState<'a> {
pub backbone: &'a mut BackboneDatabase,
consumer: StreamConsumer,
}

impl<'a> InitState<'a> {
pub fn new(backbone: &'a mut BackboneDatabase) -> anyhow::Result<Self> {
let mut config = ClientConfig::new();
config
.set("group.id", &backbone.config.node_id)
.set(
"bootstrap.servers",
backbone
.config
.kafka_bootstrap_servers
.first()
.unwrap()
.to_string(),
)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false");

let consumer: StreamConsumer =
config.clone().set("auto.offset.reset", "latest").create()?;
consumer.subscribe(&[&backbone.config.cluster_id])?;

Ok(Self { consumer, backbone })
}

pub async fn run(self) -> anyhow::Result<Role<'a>> {
tracing::info!("entering idle state");
dbg!(&self.backbone.config.node_id);
dbg!();
loop {
let msg = self.consumer.recv().await?;
if msg.key() == Some(b"meta") {
match msg.payload() {
Some(payload) => {
let meta: MetaMessage = serde_json::from_slice(payload)?;
if self.backbone.term >= meta.term {
continue;
}
let offset = msg.offset();
drop(msg); // holding a ref to the message while dropping the consumer causes a
// deadlock in the Drop implementation of StreamConsumer
return Role::transition(self, &meta, offset);
}
None => anyhow::bail!("message with empty payload"),
}
}
}
}
}
Loading