Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serializable Snapshot Isolation (SSI) #79

Merged
merged 85 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
9221403
work in progress multi-writer tx
jeromegn Sep 22, 2024
6f69260
added ssi-swap unit test
marvin-j97 Sep 22, 2024
f3fb0c4
Merge pull request #1 from marvin-j97/patch-1
jeromegn Sep 22, 2024
252fbbe
clean up code a bit more, separate conflict manager and conflict chec…
jeromegn Sep 22, 2024
007065d
convert prefix to a range instead, should be more performant
jeromegn Sep 22, 2024
803639e
clippy
jeromegn Sep 22, 2024
9a77a54
Merge branch 'main' into ssi-tx
jeromegn Sep 22, 2024
fc8d8ba
doc: add ssi to readme
marvin-j97 Sep 22, 2024
9c900c4
refactor: remove inline attributes
marvin-j97 Sep 22, 2024
a5f1894
enable unwrap_used lint again
marvin-j97 Sep 22, 2024
ac492b1
reduce msrv back to 1.74
marvin-j97 Sep 22, 2024
0b6bde4
Merge branch 'main' into ssi-tx
marvin-j97 Sep 22, 2024
c9e098d
cleanup 1
marvin-j97 Sep 22, 2024
f20e452
cleanup 2
marvin-j97 Sep 22, 2024
ad21bce
fix merge
marvin-j97 Sep 22, 2024
be29646
separate SSI and Single Writer transactions, enable either with a fea…
jeromegn Sep 23, 2024
21cc3a4
add a few more tests for SSI, including anti-dependency cycles
jeromegn Sep 23, 2024
f442331
add nextest config
marvin-j97 Sep 23, 2024
73263db
bring back single_writer_tx as the default tx type
jeromegn Sep 23, 2024
6a0c234
fix TxPartitionHandle implementation for both ssi_tx and single_write…
jeromegn Sep 23, 2024
4b8de46
adjust SSI testing
marvin-j97 Sep 23, 2024
44eeb70
Merge remote-tracking branch 'jeromegn/ssi-tx' into ssi-tx
marvin-j97 Sep 23, 2024
ef808d7
fix: ssi write_tx open
marvin-j97 Sep 23, 2024
8e4f525
default to single writer tx again
marvin-j97 Sep 23, 2024
eb4196e
get rid of wmark and implement our own simplified Oracle
jeromegn Sep 23, 2024
f4c8c29
Merge branch 'main' into ssi-tx
jeromegn Sep 24, 2024
eb1a5e1
added tests for update_fetch, fetch_update, range and prefix
jeromegn Sep 24, 2024
9283923
remove resolved TODOs
jeromegn Sep 24, 2024
b3bb2bd
Merge branch 'main' into ssi-tx
marvin-j97 Sep 24, 2024
17f80c2
simplfy TestEnv for WriteTransaction tests
jeromegn Sep 24, 2024
82609a9
test that the oracle garbage collects past transactions
jeromegn Sep 24, 2024
6f5725d
make take public
jeromegn Sep 24, 2024
6e52236
upgrade tower to fix axum's tower version
jeromegn Sep 24, 2024
8dd94a8
add ssi doc tests to CI
marvin-j97 Sep 24, 2024
caf1aed
newline
marvin-j97 Sep 24, 2024
5030e3b
acquire write_serialize_lock before creating a new write transaction …
jeromegn Sep 25, 2024
7f1c619
mod docs to single_writer, rename WriteTransaction to BaseTransaction…
jeromegn Sep 25, 2024
7b4faf7
fix import name change
jeromegn Sep 25, 2024
07e7de4
fix docs rendering for WriteTransaction
jeromegn Sep 25, 2024
a71852f
remove mutex inside conflict manager
marvin-j97 Sep 25, 2024
174a738
mark tx write ops as slow??
marvin-j97 Sep 25, 2024
1f1345d
remove concept of ConflictChecker now that we don't have mutexes
jeromegn Sep 25, 2024
f8bfc60
switch to a FnMut for the closure of fetch_update and update_fetch
jeromegn Sep 25, 2024
ee230a7
Merge branch 'main' into ssi-tx
jeromegn Sep 25, 2024
5e70489
Update README.md
marvin-j97 Sep 25, 2024
789d009
bind iterator lifetimes to transactions
jeromegn Sep 25, 2024
73482c4
fix iterator lifetimes on single writer tx
jeromegn Sep 25, 2024
d42e911
test(ssi): add 2 write skew tests
marvin-j97 Sep 25, 2024
9b8d47c
fmt
marvin-j97 Sep 25, 2024
f7ee40d
fmt
marvin-j97 Sep 25, 2024
d3a8d54
refactor
marvin-j97 Sep 25, 2024
98701e2
rename orc
marvin-j97 Sep 25, 2024
5355d6d
update comment
marvin-j97 Sep 25, 2024
92e5d55
fmt
marvin-j97 Sep 25, 2024
38fa766
fmt
marvin-j97 Sep 25, 2024
2c4489f
todo comment
marvin-j97 Sep 26, 2024
bf555b1
Update keyspace.rs
marvin-j97 Sep 26, 2024
7af81aa
remove oracle::Error and therefore ssi::Error, use crate's Error::Poi…
jeromegn Sep 26, 2024
2c0cf8e
Merge branch 'main' into ssi-tx
jeromegn Sep 26, 2024
3e1f06a
use lsm-tree 2.0.2
jeromegn Sep 26, 2024
9504fd4
re-use range function instead of a custom prefix function
jeromegn Sep 26, 2024
065bbda
Merge branch 'main' into ssi-tx
jeromegn Sep 26, 2024
d4e7c7b
Update keyspace.rs
marvin-j97 Sep 26, 2024
327260f
Update keyspace.rs
marvin-j97 Sep 26, 2024
43b0944
add ssi-tx example
marvin-j97 Sep 27, 2024
acc9dcf
Merge branch 'main' into ssi-tx
marvin-j97 Sep 27, 2024
aa5f2c1
Merge branch 'main' into ssi-tx
marvin-j97 Sep 28, 2024
a04b2e2
Merge branch 'main' into ssi-tx
marvin-j97 Sep 29, 2024
33ca1cf
Merge branch 'main' into ssi-tx
marvin-j97 Oct 2, 2024
308060d
refactor
marvin-j97 Oct 10, 2024
1a0cb76
add $ to partition name charset
marvin-j97 Oct 13, 2024
3b4661b
doc: partition name
marvin-j97 Oct 13, 2024
630bb0b
test: partition names
marvin-j97 Oct 18, 2024
cfc610e
version
marvin-j97 Oct 18, 2024
1e74c5c
Merge branch '2.2.0' into ssi-tx
marvin-j97 Oct 18, 2024
9abb595
don't insert in FAU/UAF if value hasn't changed
marvin-j97 Oct 18, 2024
489c273
add SSI tx + GC unit test
marvin-j97 Oct 19, 2024
d04fe4c
rename
marvin-j97 Oct 19, 2024
e2d3e69
Merge branch 'ssi' into ssi-tx
marvin-j97 Oct 19, 2024
40499fc
clippy
marvin-j97 Oct 19, 2024
6a0501b
Revert "clippy"
marvin-j97 Oct 19, 2024
1de3a8c
clippy ignore
marvin-j97 Oct 19, 2024
2a473c2
Merge branch 'main' into ssi-tx
marvin-j97 Oct 19, 2024
3509eac
example: add atomic SSI counter
marvin-j97 Oct 19, 2024
e868f20
add more tx examples
marvin-j97 Oct 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.default]
slow-timeout = "1m"
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ jobs:
run: cargo test --features __internal_whitebox -- whitebox_ --test-threads=1
- name: Run tests
run: cargo nextest run --features lz4,miniz,single_writer_tx,bloom
- name: Run SSI tests
run: cargo nextest run --no-default-features --features ssi_tx tx_ssi_
- name: Run doc tests
run: cargo test --doc
- name: Run SSI doc tests
run: cargo test --no-default-features --features ssi_tx --doc
- name: Build & test examples
run: node compile_examples.mjs
cross:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lz4 = ["lsm-tree/lz4"]
miniz = ["lsm-tree/miniz"]
bloom = ["lsm-tree/bloom"]
single_writer_tx = []
ssi_tx = []
__internal_whitebox = []

[dependencies]
Expand All @@ -42,6 +43,7 @@ rand = "0.8.5"

[package.metadata.cargo-all-features]
denylist = ["__internal_whitebox"]
skip_feature_sets = [["ssi_tx", "single_writer_tx"]]

[[bench]]
name = "lsmt"
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Fjall is an LSM-based embeddable key-value storage engine written in Rust. It fe
- Automatic background maintenance
- Partitions (a.k.a. column families) with cross-partition atomic semantics
- Built-in compression (default = LZ4)
- Single-writer, multi-reader transactions (optional)
- Serializable transactions (optional)
- Key-value separation for large blob use cases (optional)

Each `Keyspace` is a single logical database and is split into `partitions` (a.k.a. column families) - you should probably only use a single keyspace for your application. Each partition is physically a single LSM-tree and its own logical collection (a persistent, sorted map); however, write operations across partitions are atomic as they are persisted in a single keyspace-level journal, which will be recovered on restart.
Expand Down Expand Up @@ -117,6 +117,13 @@ Allows opening a transactional Keyspace for single-writer (serialized) transacti

*Enabled by default.*

### ssi_tx

Allows opening a transactional Keyspace for multi-writer, serializable transactions, allowing RYOW (read-your-own-write), fetch-and-update and other atomic operations.
Conflict checking is done using optimistic concurrency control.

*Disabled by default.*

## Stable disk format

The disk format is stable as of 1.0.0.
Expand Down
Empty file added examples/tx-ssi-cc/.run
Empty file.
13 changes: 13 additions & 0 deletions examples/tx-ssi-cc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "tx-ssi-cc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fjall = { path = "../../", default-features = false, features = [
"bloom",
"lz4",
"ssi_tx",
] }
3 changes: 3 additions & 0 deletions examples/tx-ssi-cc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# tx-ssi-cc

This example demonstrates concurrent transactions using SSI (serializable snapshot isolation).
53 changes: 53 additions & 0 deletions examples/tx-ssi-cc/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::{Duration, Instant};

fn main() -> fjall::Result<()> {
let keyspace = fjall::Config::default()
.temporary(true)
.open_transactional()?;
let items = keyspace.open_partition("items", Default::default())?;

let start = Instant::now();

let t1 = {
let keyspace = keyspace.clone();
let items = items.clone();

std::thread::spawn(move || {
let mut wtx = keyspace.write_tx().unwrap();
println!("Started tx1");
std::thread::sleep(Duration::from_secs(3));
wtx.insert(&items, "a", "a");
wtx.commit()
})
};

let t2 = {
let keyspace = keyspace.clone();
let items = items.clone();

std::thread::spawn(move || {
let mut wtx = keyspace.write_tx().unwrap();
println!("Started tx2");
std::thread::sleep(Duration::from_secs(3));
wtx.insert(&items, "b", "b");
wtx.commit()
})
};

t1.join()
.expect("should join")?
.expect("tx should not fail");

t2.join()
.expect("should join")?
.expect("tx should not fail");

// NOTE: We would expect a single writer tx implementation to finish in
// ~6 seconds
println!("Done in {:?}, items.len={}", start.elapsed(), {
let rtx = keyspace.read_tx();
rtx.len(&items)?
});

Ok(())
}
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Config {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub fn open_transactional(self) -> crate::Result<crate::TxKeyspace> {
crate::TxKeyspace::open(self)
}
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
/// Invalid or unparsable data format version
InvalidVersion(Option<Version>),

/// A previous flush operation failed, indicating a hardware-related failure
/// A previous flush / commit operation failed, indicating a hardware-related failure
///
/// Future writes will not be accepted as consistency cannot be guaranteed.
///
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod snapshot_nonce;
mod snapshot_tracker;
mod tracked_snapshot;

#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
mod tx;

mod version;
Expand All @@ -127,7 +127,7 @@ pub use {
version::Version,
};

#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub use tx::{
keyspace::{TransactionalKeyspace, TxKeyspace},
partition::TransactionalPartitionHandle,
Expand All @@ -142,15 +142,15 @@ pub type WriteBatch = Batch;
pub type Partition = PartitionHandle;

/// Alias for [`TransactionalPartitionHandle`]
#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub type TxPartition = TransactionalPartitionHandle;

/// Alias for [`TransactionalPartitionHandle`]
#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub type TxPartitionHandle = TransactionalPartitionHandle;

/// Alias for [`TransactionalPartitionHandle`]
#[cfg(feature = "single_writer_tx")]
#[cfg(any(feature = "single_writer_tx", feature = "ssi_tx"))]
pub type TransactionalPartition = TransactionalPartitionHandle;

/// A snapshot moment
Expand Down
2 changes: 1 addition & 1 deletion src/snapshot_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for SnapshotTrackerInner {
fn default() -> Self {
Self {
data: DashMap::default(),
safety_gap: 100,
safety_gap: 50,
lowest_freed_instant: RwLock::default(),
}
}
Expand Down
183 changes: 183 additions & 0 deletions src/tx/conflict_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use crate::batch::PartitionKey;
use core::ops::Bound;
use lsm_tree::Slice;
use std::{
collections::{BTreeMap, BTreeSet},
ops::RangeBounds,
};

#[derive(Clone, Debug)]
enum Read {
Single(Slice),
Range {
start: Bound<Slice>,
end: Bound<Slice>,
},
All,
}

#[derive(Default, Debug)]
pub struct ConflictManager {
reads: BTreeMap<PartitionKey, Vec<Read>>,
conflict_keys: BTreeMap<PartitionKey, BTreeSet<Slice>>,
}

impl ConflictManager {
fn push_read(&mut self, partition: &PartitionKey, read: Read) {
if let Some(tbl) = self.reads.get_mut(partition) {
tbl.push(read);
} else {
self.reads.entry(partition.clone()).or_default().push(read);
}
}

pub fn mark_read(&mut self, partition: &PartitionKey, key: &Slice) {
self.push_read(partition, Read::Single(key.clone()));
}

pub fn mark_conflict(&mut self, partition: &PartitionKey, key: &[u8]) {
if let Some(tbl) = self.conflict_keys.get_mut(partition) {
tbl.insert(key.into());
} else {
self.conflict_keys
.entry(partition.clone())
.or_default()
.insert(key.into());
}
}

pub fn mark_range(&mut self, partition: &PartitionKey, range: impl RangeBounds<Slice>) {
let start = match range.start_bound() {
Bound::Included(k) => Bound::Included(k.clone()),
Bound::Excluded(k) => Bound::Excluded(k.clone()),
Bound::Unbounded => Bound::Unbounded,
};

let end = match range.end_bound() {
Bound::Included(k) => Bound::Included(k.clone()),
Bound::Excluded(k) => Bound::Excluded(k.clone()),
Bound::Unbounded => Bound::Unbounded,
};

let read = if start == Bound::Unbounded && end == Bound::Unbounded {
Read::All
} else {
Read::Range { start, end }
};

self.push_read(partition, read);
}

#[allow(clippy::too_many_lines)]
pub fn has_conflict(&self, other: &Self) -> bool {
if self.reads.is_empty() {
return false;
}

for (partition, keys) in &self.reads {
if let Some(other_conflict_keys) = other.conflict_keys.get(partition) {
for ro in keys {
match ro {
Read::Single(k) => {
if other_conflict_keys.contains(k) {
return true;
}
}
Read::Range { start, end } => match (start, end) {
(Bound::Included(start), Bound::Included(end)) => {
if other_conflict_keys
.range::<Slice, _>((
Bound::Included(start),
Bound::Included(end),
))
.next()
.is_some()
{
return true;
}
}
(Bound::Included(start), Bound::Excluded(end)) => {
if other_conflict_keys
.range::<Slice, _>((
Bound::Included(start),
Bound::Excluded(end),
))
.next()
.is_some()
{
return true;
}
}
(Bound::Included(start), Bound::Unbounded) => {
if other_conflict_keys
.range::<Slice, _>((Bound::Included(start), Bound::Unbounded))
.next()
.is_some()
{
return true;
}
}
(Bound::Excluded(start), Bound::Included(end)) => {
if other_conflict_keys
.range::<Slice, _>((
Bound::Excluded(start),
Bound::Included(end),
))
.next()
.is_some()
{
return true;
}
}
(Bound::Excluded(start), Bound::Excluded(end)) => {
if other_conflict_keys
.range::<Slice, _>((
Bound::Excluded(start),
Bound::Excluded(end),
))
.next()
.is_some()
{
return true;
}
}
(Bound::Excluded(start), Bound::Unbounded) => {
if other_conflict_keys
.range::<Slice, _>((Bound::Excluded(start), Bound::Unbounded))
.next()
.is_some()
{
return true;
}
}
(Bound::Unbounded, Bound::Included(end)) => {
let range = ..=end;
for write in other_conflict_keys {
if range.contains(&write) {
return true;
}
}
}
(Bound::Unbounded, Bound::Excluded(end)) => {
let range = ..end;
for write in other_conflict_keys {
if range.contains(&write) {
return true;
}
}
}
(Bound::Unbounded, Bound::Unbounded) => unreachable!(),
},
Read::All => {
if !other_conflict_keys.is_empty() {
return true;
}
}
}
}
}
}

false
}
}
Loading