Skip to content

Commit

Permalink
Refactor commit encoding / decoding (#495)
Browse files Browse the repository at this point in the history
* core: Refactor commit encoding / decoding

Use `sats::{BufReader, BufWriter}` for decoding / encoding of `Commit`
and associated types. This makes `decode` fallible (which is quite
desirable, instead of panicking).

As the `DecodeError` from sats is fairly sparse, also add some context
about where exactly decoding failed.

Lastly, add some documentation and (property) tests.
  • Loading branch information
kim authored and kulakowski committed Nov 7, 2023
1 parent c42d482 commit 8e484e1
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 150 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ postgres-types = "0.2.5"
proc-macro2 = "1.0"
prometheus = "0.13.0"
proptest = "1.2.0"
proptest-derive = "0.4.0"
prost = "0.10"
prost-build = { version = "0.10" }
quick-junit = { version = "0.3.2" }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ default = ["odb_sled"]
[dev-dependencies]
rusqlite.workspace = true
criterion.workspace = true
proptest.workspace = true
proptest-derive.workspace = true
rand.workspace = true

[build-dependencies]
Expand Down
9 changes: 3 additions & 6 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl CommitLog {
}
}

let mut bytes = Vec::new();
let mut bytes = Vec::with_capacity(unwritten_commit.encoded_len());
unwritten_commit.encode(&mut bytes);

unwritten_commit.parent_commit_hash = Some(hash_bytes(&bytes));
Expand Down Expand Up @@ -279,11 +279,8 @@ impl Iterator for IterSegment {

fn next(&mut self) -> Option<Self::Item> {
let next = self.inner.next()?;
Some(next.map(|bytes| {
// It seems very improbable that `decode` is infallible...
let (commit, _) = Commit::decode(bytes);
commit
}))
let io = |e| io::Error::new(io::ErrorKind::InvalidData, e);
Some(next.and_then(|bytes| Commit::decode(&mut bytes.as_slice()).map_err(io)))
}
}

Expand Down
207 changes: 136 additions & 71 deletions crates/core/src/db/messages/commit.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,119 @@
use anyhow::{bail, Context as _};
use spacetimedb_sats::buffer::{BufReader, BufWriter};
use std::{fmt, sync::Arc};

use super::transaction::Transaction;
use crate::hash::Hash;
use std::sync::Arc;

// aka "Block" from blockchain, aka RecordBatch, aka TxBatch
#[derive(Debug)]
#[cfg(test)]
use proptest::prelude::*;
#[cfg(test)]
use proptest_derive::Arbitrary;

/// A commit is one record in the write-ahead log.
///
/// Encoding:
///
/// ```text
/// [0u8 | 1u8<hash(32)>]<commit_offset(8)><min_tx_offset<8>[<transaction>...]
/// ```
#[derive(Debug, Default, PartialEq)]
#[cfg_attr(test, derive(Arbitrary))]
pub struct Commit {
/// The [`Hash`] over the encoded bytes of the previous commit, or `None` if
/// it is the very first commit.
#[cfg_attr(test, proptest(strategy = "arbitrary::parent_commit_hash()"))]
pub parent_commit_hash: Option<Hash>,
/// Counter of all commits in a log.
pub commit_offset: u64,
/// Counter of all transactions in a log.
///
/// That is, a per-log value which is incremented by `transactions.len()`
/// when the [`Commit`] is constructed.
pub min_tx_offset: u64,
/// The [`Transaction`]s in this commit, usually only one.
#[cfg_attr(test, proptest(strategy = "arbitrary::transactions()"))]
pub transactions: Vec<Arc<Transaction>>,
}

// TODO: Maybe a transaction buffer hash?
// commit: <parent_commit_hash(32)><commit_offset(8)><min_tx_offset(8)>[<transaction>...]*
impl Commit {
pub fn decode(bytes: impl AsRef<[u8]>) -> (Self, usize) {
let bytes = &mut bytes.as_ref();
if bytes.is_empty() {
return (
Commit {
parent_commit_hash: None,
commit_offset: 0,
min_tx_offset: 0,
transactions: Vec::new(),
},
0,
);
#[cfg(test)]
mod arbitrary {
use super::*;

// [`Hash`] is defined in `lib`, so we can't have an [`Arbitrary`] impl for
// it just yet due to orphan rules.
pub fn parent_commit_hash() -> impl Strategy<Value = Option<Hash>> {
any::<Option<[u8; 32]>>().prop_map(|maybe_hash| maybe_hash.map(|data| Hash { data }))
}

// Custom strategy to apply an upper bound on the number of [`Transaction`]s
// generated.
//
// We only ever commit a single transaction in practice.
pub fn transactions() -> impl Strategy<Value = Vec<Arc<Transaction>>> {
prop::collection::vec(any::<Arc<Transaction>>(), 1..8)
}
}

/// Error context for [`Commit::decode`]
enum Context {
Parent,
Hash,
CommitOffset,
MinTxOffset,
Transaction(usize),
}

impl fmt::Display for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Failed to decode `Commit`: ")?;
match self {
Self::Parent => f.write_str("parent commit hash tag"),
Self::Hash => f.write_str("parent commit hash"),
Self::CommitOffset => f.write_str("commit offset"),
Self::MinTxOffset => f.write_str("min transaction offset"),
Self::Transaction(n) => f.write_str(&format!("transaction {n}")),
}
}
}

let mut read_count = 0;
impl Commit {
pub fn decode<'a>(reader: &mut impl BufReader<'a>) -> anyhow::Result<Self> {
if reader.remaining() == 0 {
return Ok(Self::default());
}

let parent_commit_hash = if bytes[read_count] != 0 {
read_count += 1;
let parent_commit_hash = Hash::from_slice(&bytes[read_count..read_count + 32]);
read_count += 32;
Some(parent_commit_hash)
} else {
read_count += 1;
None
let parent_commit_hash = match reader.get_u8().context(Context::Parent)? {
0 => None,
1 => reader
.get_array()
.map(|data| Hash { data })
.map(Some)
.context(Context::Hash)?,
x => bail!("Invalid tag for `Option<Hash>`: {x}"),
};

let mut dst = [0u8; 8];
dst.copy_from_slice(&bytes[read_count..read_count + 8]);
let commit_offset = u64::from_le_bytes(dst);
read_count += 8;

let mut dst = [0u8; 8];
dst.copy_from_slice(&bytes[read_count..read_count + 8]);
let min_tx_offset = u64::from_le_bytes(dst);
read_count += 8;

let mut transactions: Vec<Arc<Transaction>> = Vec::new();
while read_count < bytes.len() {
let (tx, read) = Transaction::decode(&bytes[read_count..]);
read_count += read;
transactions.push(Arc::new(tx));
let commit_offset = reader.get_u64().context(Context::CommitOffset)?;
let min_tx_offset = reader.get_u64().context(Context::MinTxOffset)?;
let mut transactions = Vec::new();
while reader.remaining() > 0 {
let tx = Transaction::decode(reader)
.map(Arc::new)
.with_context(|| Context::Transaction(transactions.len() + 1))?;
transactions.push(tx);
}

(
Commit {
parent_commit_hash,
commit_offset,
min_tx_offset,
transactions,
},
read_count,
)
Ok(Self {
parent_commit_hash,
commit_offset,
min_tx_offset,
transactions,
})
}

pub fn encoded_len(&self) -> usize {
let mut count = 0;

if self.parent_commit_hash.is_none() {
count += 1;
} else {
count += 1;
count += self.parent_commit_hash.unwrap().data.len();
let mut count = 1; // tag for option
if let Some(hash) = self.parent_commit_hash {
count += hash.data.len();
}

// 8 for commit_offset
Expand All @@ -91,21 +129,48 @@ impl Commit {
count
}

pub fn encode(&self, bytes: &mut Vec<u8>) {
bytes.reserve(self.encoded_len());

if self.parent_commit_hash.is_none() {
bytes.push(0);
} else {
bytes.push(1);
bytes.extend(self.parent_commit_hash.unwrap().data);
pub fn encode(&self, writer: &mut impl BufWriter) {
match self.parent_commit_hash {
Some(hash) => {
writer.put_u8(1);
writer.put_slice(&hash.data);
}
None => writer.put_u8(0),
}
writer.put_u64(self.commit_offset);
writer.put_u64(self.min_tx_offset);
for tx in &self.transactions {
tx.encode(writer);
}
}
}

bytes.extend(self.commit_offset.to_le_bytes());
bytes.extend(self.min_tx_offset.to_le_bytes());
#[cfg(test)]
mod tests {
use super::*;

proptest! {
// Generating arbitrary commits is quite slow, so limit to just a few
// cases.
//
// Note that this config applies to all `#[test]`s within the enclosing
// `proptest!`.
#![proptest_config(ProptestConfig::with_cases(64))]


#[test]
fn prop_commit_encoding_roundtrip(commit in any::<Commit>()) {
let mut buf = Vec::new();
commit.encode(&mut buf);
let decoded = Commit::decode(&mut buf.as_slice()).unwrap();
prop_assert_eq!(commit, decoded)
}

for tx in &self.transactions {
tx.encode(bytes);
#[test]
fn prop_encoded_len_is_encoded_len(commit in any::<Commit>()) {
let mut buf = Vec::new();
commit.encode(&mut buf);
prop_assert_eq!(buf.len(), commit.encoded_len())
}
}
}
Loading

0 comments on commit 8e484e1

Please sign in to comment.