Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Load large objects eagerly during log replay #594

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 89 additions & 12 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
datastore::traits::{MutTxDatastore, TxData},
message_log::{self, MessageLog},
messages::commit::Commit,
ostorage::ObjectDB,
ostorage::{memory_object_db::MemoryObjectDB, ObjectDB},
FsyncPolicy,
};
use crate::{
Expand All @@ -20,8 +20,11 @@ use crate::{
use anyhow::Context;
use spacetimedb_sats::hash::{hash_bytes, Hash};
use spacetimedb_sats::DataKey;
use std::io;
use std::sync::{Arc, Mutex, MutexGuard};
use std::{
collections::{hash_map, HashMap},
io,
sync::{Arc, Mutex, MutexGuard},
};

/// A read-only handle to the commit log.
#[derive(Clone)]
Expand Down Expand Up @@ -52,6 +55,12 @@ impl CommitLog {
/// Traverse the log from the start, calling `F` with each [`Commit`]
/// encountered.
///
/// The second parameter to `F` is an [`ObjectDB`] which is guaranteed to
/// contain all non-inline objects referenced from the corresponding commit.
/// If a commit is encountered for which an object cannot be resolved from
/// the [`CommitLog`]s underlying object storage, `replay` aborts and `F` is
/// not called.
///
/// The traversal performs some consistency checks, and _may_ perform error
/// correction on the persistent log before returning.
///
Expand All @@ -63,8 +72,7 @@ impl CommitLog {
/// and can thus safely be written to via the resulting [`CommitLogMut`].
pub fn replay<F>(&self, mut f: F) -> Result<CommitLogMut, DBError>
where
// TODO(kim): `&dyn ObjectDB` should suffice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so do you :)

F: FnMut(Commit, Arc<Mutex<Box<dyn ObjectDB + Send>>>) -> Result<(), DBError>,
F: FnMut(Commit, &dyn ObjectDB) -> Result<(), DBError>,
{
let unwritten_commit = {
let mut mlog = self.mlog.lock().unwrap();
Expand All @@ -75,14 +83,16 @@ impl CommitLog {
last_commit_offset: None,
last_hash: None,

odb: self.odb.clone(),

segments,
segment_offset: 0,
current_segment: None,
};

for commit in &mut iter {
match commit {
Ok(commit) => f(commit, self.odb.clone())?,
Ok((commit, objects)) => f(commit, &objects)?,
Err(ReplayError::Other { source }) => return Err(source.into()),

// We expect that partial writes can occur at the end of a
Expand Down Expand Up @@ -117,12 +127,32 @@ impl CommitLog {
}
.into());
}
Err(ReplayError::MissingObject {
segment_offset,
last_commit_offset,
hash,
referenced_from_commit_offset,
}) if segment_offset < total_segments - 1 => {
log::warn!(
"Missing object {} referenced from {}",
hash,
referenced_from_commit_offset
);
return Err(LogReplayError::TrailingSegments {
segment_offset,
total_segments,
commit_offset: last_commit_offset,
source: io::Error::new(io::ErrorKind::Other, "Missing object"),
}
.into());
}

// We are near the end of the log, so trim it to the known-
// good prefix.
Err(
ReplayError::OutOfOrder { last_commit_offset, .. }
| ReplayError::CorruptedData { last_commit_offset, .. },
| ReplayError::CorruptedData { last_commit_offset, .. }
| ReplayError::MissingObject { last_commit_offset, .. },
) => {
mlog.reset_to(last_commit_offset)
.map_err(|source| LogReplayError::Reset {
Expand Down Expand Up @@ -490,12 +520,46 @@ struct Replay {
last_commit_offset: Option<u64>,
last_hash: Option<Hash>,

odb: Arc<Mutex<Box<dyn ObjectDB + Send>>>,

segments: message_log::Segments,
segment_offset: usize,

current_segment: Option<IterSegment>,
}

impl Replay {
fn collect_objects(&self, commit: &Commit) -> Result<MemoryObjectDB, ReplayError> {
let odb = self.odb.lock().unwrap();
let mut objects = HashMap::new();

let hashes = commit
.transactions
.iter()
.flat_map(|tx| &tx.writes)
.filter_map(|write| {
if let DataKey::Hash(hash) = write.data_key {
Some(hash)
} else {
None
}
});
for hash in hashes {
if let hash_map::Entry::Vacant(entry) = objects.entry(hash) {
let obj = odb.get(hash).ok_or(ReplayError::MissingObject {
segment_offset: self.segment_offset,
last_commit_offset: self.last_commit_offset.unwrap_or_default(),
hash,
referenced_from_commit_offset: commit.commit_offset,
})?;
entry.insert(obj);
}
}

Ok(objects.into())
}
}

enum ReplayError {
/// A [`Commit`] was decoded successfully, but is not contiguous.
///
Expand Down Expand Up @@ -526,6 +590,17 @@ enum ReplayError {
last_commit_offset: u64,
source: io::Error,
},
/// An object referenced from a [`Commit`] was not found in the object db.
///
/// This error may occur in [`FsyncPolicy::Never`] mode, if the object db
/// happened to not be flushed to disk but the corresponding message log
/// write was.
MissingObject {
segment_offset: usize,
last_commit_offset: u64,
hash: Hash,
referenced_from_commit_offset: u64,
},
/// Some other error occurred.
///
/// May be a transient error. Processing should be aborted, and potentially
Expand All @@ -534,7 +609,7 @@ enum ReplayError {
}

impl Iterator for Replay {
type Item = Result<Commit, ReplayError>;
type Item = Result<(Commit, MemoryObjectDB), ReplayError>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(cur) = self.current_segment.as_mut() {
Expand Down Expand Up @@ -563,11 +638,13 @@ impl Iterator for Replay {
expected,
})
} else {
self.last_commit_offset = Some(commit.commit_offset);
self.last_hash = commit.parent_commit_hash;
self.tx_offset += commit.transactions.len() as u64;
self.collect_objects(&commit).map(|objects| {
self.last_commit_offset = Some(commit.commit_offset);
self.last_hash = commit.parent_commit_hash;
self.tx_offset += commit.transactions.len() as u64;

Ok(commit)
(commit, objects)
})
}
}

Expand Down
8 changes: 2 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,11 +1632,7 @@ impl Locking {
.rows
}

pub fn replay_transaction(
&self,
transaction: &Transaction,
odb: Arc<std::sync::Mutex<Box<dyn ObjectDB + Send>>>,
) -> Result<(), DBError> {
pub fn replay_transaction(&self, transaction: &Transaction, odb: &dyn ObjectDB) -> Result<(), DBError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the removal of the Send bound as a side effect.

let mut inner = self.inner.lock();
for write in &transaction.writes {
let table_id = TableId(write.set_id);
Expand All @@ -1660,7 +1656,7 @@ impl Locking {
)
}),
DataKey::Hash(hash) => {
let data = odb.lock().unwrap().get(hash).unwrap_or_else(|| {
let data = odb.get(hash).unwrap_or_else(|| {
panic!("Object {hash} referenced from transaction not present in object DB");
});
ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|e| {
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/db/ostorage/memory_object_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ impl ObjectDB for MemoryObjectDB {
Ok(0)
}
}

impl From<HashMap<Hash, Bytes>> for MemoryObjectDB {
fn from(objects: HashMap<Hash, Bytes>) -> Self {
Self { objects }
}
}
2 changes: 1 addition & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl RelationalDB {
let commit_log = commit_log.replay(|commit, odb| {
transaction_offset += commit.transactions.len();
for transaction in commit.transactions {
datastore.replay_transaction(&transaction, odb.clone())?;
datastore.replay_transaction(&transaction, odb)?;
}

let percentage =
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use spacetimedb_lib::{PrimaryKey, ProductValue};
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_sats::db::def::IndexDef;
use spacetimedb_sats::db::error::{LibError, RelationError, SchemaError};
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::product_value::InvalidFieldError;
use spacetimedb_sats::relation::FieldName;
use spacetimedb_sats::satn::Satn;
Expand Down Expand Up @@ -237,6 +238,8 @@ pub enum LogReplayError {
#[source]
source: io::Error,
},
#[error("Missing object {} referenced from commit {}", .hash, .commit_offset)]
MissingObject { hash: Hash, commit_offset: u64 },
#[error(
"Unexpected I/O error reading commit {} from segment {}: {}",
.commit_offset,
Expand Down