From 498bda77641c1cd1d20017056946519cd6d0183d Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 13 Nov 2023 13:57:39 +0100 Subject: [PATCH] core: Move replay + error correction logic to commit log Makes `CommitLog` a read-only handle and introduces `CommitLogMut`, allowing to append. This better encapsulates the error detection and trimming logic, and enforces it to be executed before being able to mutate the log. --- crates/core/src/database_instance_context.rs | 16 +- crates/core/src/db/commit_log.rs | 470 +++++++++++++------ crates/core/src/db/relational_db.rs | 208 +++----- 3 files changed, 405 insertions(+), 289 deletions(-) diff --git a/crates/core/src/database_instance_context.rs b/crates/core/src/database_instance_context.rs index 6c1f3896f1..8b0b7028d4 100644 --- a/crates/core/src/database_instance_context.rs +++ b/crates/core/src/database_instance_context.rs @@ -97,12 +97,24 @@ impl DatabaseInstanceContext { /// The number of bytes on disk occupied by the [MessageLog]. pub fn message_log_size_on_disk(&self) -> Result { - self.relational_db.commit_log().message_log_size_on_disk() + let size = self + .relational_db + .commit_log() + .map(|commit_log| commit_log.message_log_size_on_disk()) + .transpose()?; + + Ok(size.unwrap_or_default()) } /// The number of bytes on disk occupied by the [ObjectDB]. pub fn object_db_size_on_disk(&self) -> Result { - self.relational_db.commit_log().object_db_size_on_disk() + let size = self + .relational_db + .commit_log() + .map(|commit_log| commit_log.object_db_size_on_disk()) + .transpose()?; + + Ok(size.unwrap_or_default()) } /// The size of the log file. diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 06155adde8..54fe652cad 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -13,7 +13,7 @@ use crate::{ write::{Operation, Write}, }, }, - error::DBError, + error::{DBError, LogReplayError}, execution_context::ExecutionContext, }; @@ -27,27 +27,232 @@ use std::io; use std::sync::Arc; use std::sync::{Mutex, MutexGuard}; +/// A read-only handle to the commit log. #[derive(Clone)] pub struct CommitLog { - mlog: Option>>, + mlog: Arc>, odb: Arc>>, - unwritten_commit: Arc>, - fsync: bool, } impl CommitLog { - pub fn new( - mlog: Option>>, - odb: Arc>>, - unwritten_commit: Commit, - fsync: bool, - ) -> Self { - Self { - mlog, - odb, + pub const fn new(mlog: Arc>, odb: Arc>>) -> Self { + Self { mlog, odb } + } + + pub fn max_commit_offset(&self) -> u64 { + self.mlog.lock().unwrap().open_segment_max_offset + } + + /// Obtain a [`CommitLogMut`], which permits write access. + /// + /// Equivalent to `self.replay(|_, _| Ok(()))`. + pub fn to_mut(&self) -> Result { + self.replay(|_, _| Ok(())) + } + + /// Traverse the log from the start, calling `F` with each [`Commit`] + /// encountered. + /// + /// The traversal performs some consistency checks, and _may_ perform error + /// correction on the persistent log before returning. + /// + /// Currently, this method is the only way to ensure the log is consistent, + /// and can thus safely be written to via the resulting [`CommitLogMut`]. + pub fn replay(&self, mut f: F) -> Result + where + // TODO(kim): `&dyn ObjectDB` should suffice + F: FnMut(Commit, Arc>>) -> Result<(), DBError>, + { + let unwritten_commit = { + let mut mlog = self.mlog.lock().unwrap(); + let total_segments = mlog.total_segments(); + let segments = mlog.segments(); + let mut iter = Replay { + tx_offset: 0, + last_commit_offset: None, + last_hash: None, + + segments, + segment_offset: 0, + current_segment: None, + }; + + for commit in &mut iter { + match commit { + Ok(commit) => f(commit, self.odb.clone())?, + Err(ReplayError::Other { source }) => return Err(source.into()), + + // We expect that partial writes can occur at the end of a + // segment. Trimming the log is, however, only safe if we're + // at the end of the _log_. + Err(ReplayError::OutOfOrder { + segment_offset, + last_commit_offset, + decoded_commit_offset, + expected, + }) if segment_offset < total_segments - 1 => { + log::warn!("Out-of-order commit {}, expected {}", decoded_commit_offset, expected); + return Err(LogReplayError::TrailingSegments { + segment_offset, + total_segments, + commit_offset: last_commit_offset, + source: io::Error::new(io::ErrorKind::Other, "Out-of-order commit"), + } + .into()); + } + Err(ReplayError::CorruptedData { + segment_offset, + last_commit_offset: commit_offset, + source, + }) if segment_offset < total_segments - 1 => { + log::warn!("Corrupt commit after offset {}", commit_offset); + return Err(LogReplayError::TrailingSegments { + segment_offset, + total_segments, + commit_offset, + source, + } + .into()); + } + + // We are near the end of the log, so trim it to the known- + // good prefix. + Err( + ReplayError::OutOfOrder { last_commit_offset, .. } + | ReplayError::CorruptedData { last_commit_offset, .. }, + ) => { + mlog.reset_to(last_commit_offset) + .map_err(|source| LogReplayError::Reset { + offset: last_commit_offset, + source, + })?; + break; + } + } + } + + Commit { + parent_commit_hash: iter.last_hash, + commit_offset: iter.last_commit_offset.map(|off| off + 1).unwrap_or_default(), + min_tx_offset: iter.tx_offset, + transactions: Vec::new(), + } + }; + + Ok(CommitLogMut { + mlog: self.mlog.clone(), + odb: self.odb.clone(), unwritten_commit: Arc::new(Mutex::new(unwritten_commit)), - fsync, + fsync: false, + }) + } + + /// The number of bytes on disk occupied by the [MessageLog]. + pub fn message_log_size_on_disk(&self) -> Result { + let guard = self.mlog.lock().unwrap(); + Ok(guard.size()) + } + + /// The number of bytes on disk occupied by the [ObjectDB]. + pub fn object_db_size_on_disk(&self) -> Result { + let guard = self.odb.lock().unwrap(); + guard.size_on_disk() + } + + /// Obtain an iterator over a snapshot of the raw message log segments. + /// + /// See also: [`MessageLog::segments`] + pub fn message_log_segments(&self) -> message_log::Segments { + self.message_log_segments_from(0) + } + + /// Obtain an iterator over a snapshot of the raw message log segments + /// containing messages equal to or newer than `offset`. + /// + /// See [`MessageLog::segments_from`] for more information. + pub fn message_log_segments_from(&self, offset: u64) -> message_log::Segments { + let mlog = self.mlog.lock().unwrap(); + mlog.segments_from(offset) + } + + /// Obtain an iterator over the [`Commit`]s in the log. + /// + /// The iterator represents a snapshot of the log. + pub fn iter(&self) -> Iter { + self.iter_from(0) + } + + /// Obtain an iterator over the [`Commit`]s in the log, starting at `offset`. + /// + /// The iterator represents a snapshot of the log. + /// + /// Note that [`Commit`]s with an offset _smaller_ than `offset` may be + /// yielded if the offset doesn't fall on a segment boundary, due to the + /// lack of slicing support. + /// + /// See [`MessageLog::segments_from`] for more information. + pub fn iter_from(&self, offset: u64) -> Iter { + self.message_log_segments_from(offset).into() + } + + /// Obtain an iterator over the large objects in [`Commit`], if any. + /// + /// Large objects are stored in the [`ObjectDB`], and are referenced from + /// the transactions in a [`Commit`]. + /// + /// The iterator attempts to read each large object in turn, yielding an + /// [`io::Error`] with kind [`io::ErrorKind::NotFound`] if the object was + /// not found. + // + // TODO(kim): We probably want a more efficient way to stream the contents + // of the ODB over the network for replication purposes. + pub fn commit_objects<'a>(&self, commit: &'a Commit) -> impl Iterator> + 'a { + fn hashes(tx: &Arc) -> impl Iterator + '_ { + tx.writes.iter().filter_map(|write| { + if let DataKey::Hash(h) = write.data_key { + Some(h) + } else { + None + } + }) } + + let odb = self.odb.clone(); + commit.transactions.iter().flat_map(hashes).map(move |hash| { + let odb = odb.lock().unwrap(); + odb.get(hash) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, format!("Missing object: {hash}"))) + }) + } +} + +/// A mutable handle to the commit log. +/// +/// "Mutable" specifically means that new commits can be appended to the log +/// via [`CommitLogMut::append_tx`]. +/// +/// A [`CommitLog`] can by obtained from [`CommitLogMut`] via the [`From`] impl. +#[derive(Clone)] +pub struct CommitLogMut { + mlog: Arc>, + odb: Arc>>, + unwritten_commit: Arc>, + // TODO(kim): Use FsyncPolicy instead of bool. + fsync: bool, +} + +impl CommitLogMut { + pub fn set_fsync(&mut self, fsync: bool) { + self.fsync = fsync + } + + pub fn with_fsync(mut self, fsync: bool) -> Self { + self.fsync = fsync; + self + } + + pub fn commit_offset(&self) -> u64 { + self.mlog.lock().unwrap().open_segment_max_offset } /// Persist to disk the [Tx] result into the [MessageLog]. @@ -63,15 +268,11 @@ impl CommitLog { where D: MutTxDatastore, { - if let Some(mlog) = &self.mlog { - let mut mlog = mlog.lock().unwrap(); - self.generate_commit(ctx, tx_data, datastore) - .as_deref() - .map(|bytes| self.append_commit_bytes(&mut mlog, bytes)) - .transpose() - } else { - Ok(None) - } + let mut mlog = self.mlog.lock().unwrap(); + self.generate_commit(ctx, tx_data, datastore) + .as_deref() + .map(|bytes| self.append_commit_bytes(&mut mlog, bytes)) + .transpose() } // For testing -- doesn't require a `MutTxDatastore`, which is currently @@ -181,102 +382,8 @@ impl CommitLog { } } -/// A read-only view of a [`CommitLog`]. -pub struct CommitLogView { - mlog: Option>>, - odb: Arc>>, -} - -impl CommitLogView { - /// The number of bytes on disk occupied by the [MessageLog]. - pub fn message_log_size_on_disk(&self) -> Result { - if let Some(ref mlog) = self.mlog { - let guard = mlog.lock().unwrap(); - Ok(guard.size()) - } else { - Ok(0) - } - } - - /// The number of bytes on disk occupied by the [ObjectDB]. - pub fn object_db_size_on_disk(&self) -> Result { - let guard = self.odb.lock().unwrap(); - guard.size_on_disk() - } - - /// Obtain an iterator over a snapshot of the raw message log segments. - /// - /// See also: [`MessageLog::segments`] - pub fn message_log_segments(&self) -> message_log::Segments { - self.message_log_segments_from(0) - } - - /// Obtain an iterator over a snapshot of the raw message log segments - /// containing messages equal to or newer than `offset`. - /// - /// See [`MessageLog::segments_from`] for more information. - pub fn message_log_segments_from(&self, offset: u64) -> message_log::Segments { - if let Some(mlog) = &self.mlog { - let mlog = mlog.lock().unwrap(); - mlog.segments_from(offset) - } else { - message_log::Segments::empty() - } - } - - /// Obtain an iterator over the [`Commit`]s in the log. - /// - /// The iterator represents a snapshot of the log. - pub fn iter(&self) -> Iter { - self.iter_from(0) - } - - /// Obtain an iterator over the [`Commit`]s in the log, starting at `offset`. - /// - /// The iterator represents a snapshot of the log. - /// - /// Note that [`Commit`]s with an offset _smaller_ than `offset` may be - /// yielded if the offset doesn't fall on a segment boundary, due to the - /// lack of slicing support. - /// - /// See [`MessageLog::segments_from`] for more information. - pub fn iter_from(&self, offset: u64) -> Iter { - self.message_log_segments_from(offset).into() - } - - /// Obtain an iterator over the large objects in [`Commit`], if any. - /// - /// Large objects are stored in the [`ObjectDB`], and are referenced from - /// the transactions in a [`Commit`]. - /// - /// The iterator attempts to read each large object in turn, yielding an - /// [`io::Error`] with kind [`io::ErrorKind::NotFound`] if the object was - /// not found. - // - // TODO(kim): We probably want a more efficient way to stream the contents - // of the ODB over the network for replication purposes. - pub fn commit_objects<'a>(&self, commit: &'a Commit) -> impl Iterator> + 'a { - fn hashes(tx: &Arc) -> impl Iterator + '_ { - tx.writes.iter().filter_map(|write| { - if let DataKey::Hash(h) = write.data_key { - Some(h) - } else { - None - } - }) - } - - let odb = self.odb.clone(); - commit.transactions.iter().flat_map(hashes).map(move |hash| { - let odb = odb.lock().unwrap(); - odb.get(hash) - .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, format!("Missing object: {hash}"))) - }) - } -} - -impl From<&CommitLog> for CommitLogView { - fn from(log: &CommitLog) -> Self { +impl From<&CommitLogMut> for CommitLog { + fn from(log: &CommitLogMut) -> Self { Self { mlog: log.mlog.clone(), odb: log.odb.clone(), @@ -320,10 +427,9 @@ impl From for IterSegment { } } -/// Iterator over a [`CommitLogView`], yielding [`Commit`]s. +/// Iterator over a [`CommitLog`], yielding [`Commit`]s. /// -/// Created by [`CommitLogView::iter`] and [`CommitLogView::iter_from`] -/// respectively. +/// Created by [`CommitLog::iter`] and [`CommitLog::iter_from`] respectively. #[must_use = "iterators are lazy and do nothing unless consumed"] pub struct Iter { commits: Option, @@ -361,6 +467,107 @@ impl From for Iter { } } +/// Iterator created by [`CommitLog::replay`]. +/// +/// Similar to [`Iter`], but performs integrity checking and maintains +/// additional state. +#[must_use = "iterators are lazy and do nothing unless consumed"] +struct Replay { + tx_offset: u64, + last_commit_offset: Option, + last_hash: Option, + + segments: message_log::Segments, + segment_offset: usize, + + current_segment: Option, +} + +enum ReplayError { + OutOfOrder { + segment_offset: usize, + last_commit_offset: u64, + decoded_commit_offset: u64, + expected: u64, + }, + CorruptedData { + segment_offset: usize, + last_commit_offset: u64, + source: io::Error, + }, + Other { + source: io::Error, + }, +} + +impl Iterator for Replay { + type Item = Result; + + fn next(&mut self) -> Option { + if let Some(cur) = self.current_segment.as_mut() { + if let Some(commit) = cur.next() { + // We may be able to recover from a corrupt suffix of the log. + // For this, we need to consider three cases: + // + // 1. The `Commit` was decoded successfully, but is invalid + // 2. The `Commit` failed to decode + // 3. The underlying `MessageLog` reported an unexpected EOF + // + // Case 1. can occur because the on-disk format does not + // currently have any consistency checks built in. To detect it, + // we check that the `commit_offset` sequence is contiguous. + // + // TODO(kim): We should probably check the `parent_commit_hash` + // instead, but only after measuring the performance overhead. + let res = match commit { + Ok(commit) => { + let expected = self.last_commit_offset.map(|last| last + 1).unwrap_or_default(); + if commit.commit_offset != expected { + Err(ReplayError::OutOfOrder { + segment_offset: self.segment_offset, + last_commit_offset: self.last_commit_offset.unwrap_or_default(), + decoded_commit_offset: commit.commit_offset, + expected, + }) + } else { + self.last_commit_offset = Some(commit.commit_offset); + self.last_hash = commit.parent_commit_hash; + self.tx_offset += commit.transactions.len() as u64; + + Ok(commit) + } + } + + Err(e) => { + let err = match e.kind() { + io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof => ReplayError::CorruptedData { + segment_offset: self.segment_offset, + last_commit_offset: self.last_commit_offset.unwrap_or_default(), + source: e, + }, + _ => ReplayError::Other { source: e }, + }; + Err(err) + } + }; + + return Some(res); + } + } + + // Pop the next segment, if available. + let next_segment = self.segments.next()?; + self.segment_offset += 1; + match next_segment.try_into_iter().map(IterSegment::from) { + Ok(current_segment) => { + self.current_segment = Some(current_segment); + self.next() + } + Err(e) => Some(Err(ReplayError::Other { source: e })), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -411,26 +618,19 @@ mod tests { .unwrap(); let odb = MemoryObjectDB::default(); - let log = CommitLog::new( - Some(Arc::new(Mutex::new(mlog))), - Arc::new(Mutex::new(Box::new(odb))), - Commit { - parent_commit_hash: None, - commit_offset: 0, - min_tx_offset: 0, - transactions: Vec::new(), - }, - true, // fsync - ); + let log = CommitLog::new(Arc::new(Mutex::new(mlog)), Arc::new(Mutex::new(Box::new(odb)))) + .to_mut() + .unwrap() + .with_fsync(true); { - let mut guard = log.mlog.as_ref().unwrap().lock().unwrap(); + let mut guard = log.mlog.lock().unwrap(); for _ in 0..TOTAL_MESSAGES { log.append_commit_bytes(&mut guard, &commit_bytes).unwrap(); } } - let view = CommitLogView::from(&log); + let view = CommitLog::from(&log); let commits = view.iter().map(Result::unwrap).count(); assert_eq!(TOTAL_MESSAGES, commits); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 5b20e7afca..b1ec446262 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,4 +1,4 @@ -use super::commit_log::{CommitLog, CommitLogView}; +use super::commit_log::{CommitLog, CommitLogMut}; use super::datastore::locking_tx_datastore::{DataRef, Iter, IterByColEq, IterByColRange, Locking, MutTxId, RowId}; use super::datastore::traits::{ DataRow, IndexDef, MutProgrammable, MutTx, MutTxDatastore, Programmable, SequenceDef, TableDef, TableSchema, TxData, @@ -8,10 +8,9 @@ use super::ostorage::memory_object_db::MemoryObjectDB; use super::relational_operators::Relation; use crate::address::Address; use crate::db::db_metrics::DB_METRICS; -use crate::db::messages::commit::Commit; use crate::db::ostorage::hashmap_object_db::HashMapObjectDB; use crate::db::ostorage::ObjectDB; -use crate::error::{DBError, DatabaseError, IndexError, LogReplayError, TableError}; +use crate::error::{DBError, DatabaseError, IndexError, TableError}; use crate::execution_context::ExecutionContext; use crate::hash::Hash; use fs2::FileExt; @@ -22,7 +21,6 @@ use spacetimedb_primitives::{ColId, IndexId, SequenceId, TableId}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use std::borrow::Cow; use std::fs::{create_dir_all, File}; -use std::io; use std::ops::RangeBounds; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -43,7 +41,7 @@ pub const ST_TABLE_ID_START: TableId = TableId(2); pub struct RelationalDB { // TODO(cloutiertyler): This should not be public pub(crate) inner: Locking, - commit_log: CommitLog, + commit_log: Option, _lock: Arc, address: Address, } @@ -87,153 +85,54 @@ impl RelationalDB { .map_err(|err| DatabaseError::DatabasedOpened(root.to_path_buf(), err.into()))?; let datastore = Locking::bootstrap(db_address)?; - log::debug!("[{}] Replaying transaction log.", address); - let mut segment_index = 0; - let mut last_logged_percentage = 0; - let unwritten_commit = { - let mut transaction_offset = 0; - let mut last_commit_offset = None; - let mut last_hash: Option = None; - if let Some(message_log) = &message_log { - let mut message_log = message_log.lock().unwrap(); - let max_offset = message_log.open_segment_max_offset; - let total_segments = message_log.total_segments(); - - 'replay: for (segment_offset, segment) in message_log.segments().enumerate() { - for commit in segment.try_into_iter_commits()? { - let commit = match commit { - Ok(commit) => { - // We may decode a commit just fine, and detect - // that it's not well-formed only later when - // trying to decode the payload. - // As a cheaper alternative to verifying the - // parent hash, let's check if the commit - // sequence is contiguous. - let expected = last_commit_offset.map(|last| last + 1).unwrap_or(0); - if commit.commit_offset != expected { - let msg = - format!("Out-of-order commit {}, expected {}", commit.commit_offset, expected); - log::warn!("{msg}"); - - // If this is not at the end of the log, - // report as corrupted but leave files as is - // for forensics. - if segment_offset < total_segments - 1 { - return Err(LogReplayError::TrailingSegments { - segment_offset, - total_segments, - commit_offset: last_commit_offset.unwrap_or_default(), - source: io::Error::new(io::ErrorKind::Other, msg), - } - .into()); - } - - let reset_offset = expected.saturating_sub(1); - message_log - .reset_to(reset_offset) - .map_err(|source| LogReplayError::Reset { - offset: reset_offset, - source, - })?; - break 'replay; - } else { - commit - } - } - Err(e) if matches!(e.kind(), io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof) => { - log::warn!("Corrupt commit after offset {}", last_commit_offset.unwrap_or_default()); - - // We expect that partial writes can occur at - // the end of the log, but a corrupted segment - // in the middle indicates something else is - // wrong. - if segment_offset < total_segments - 1 { - return Err(LogReplayError::TrailingSegments { - segment_offset, - total_segments, - commit_offset: last_commit_offset.unwrap_or_default(), - source: e, - } - .into()); - } - - // Else, truncate the current segment. This - // allows to continue appending to the log - // without hitting above condition. - let reset_offset = last_commit_offset.unwrap_or_default(); - message_log - .reset_to(reset_offset) - .map_err(|source| LogReplayError::Reset { - offset: reset_offset, - source, - })?; - break 'replay; - } - Err(e) => { - return Err(LogReplayError::Io { - segment_offset, - commit_offset: last_commit_offset.unwrap_or_default(), - source: e, - } - .into()) - } - }; - segment_index += 1; - last_hash = commit.parent_commit_hash; - last_commit_offset = Some(commit.commit_offset); - for transaction in commit.transactions { - transaction_offset += 1; - // NOTE: Although I am creating a blobstore transaction in a - // one to one fashion for each message log transaction, this - // is just to reduce memory usage while inserting. We don't - // really care about inserting these transactionally as long - // as all of the writes get inserted. - datastore.replay_transaction(&transaction, odb.clone())?; - - let percentage = f64::floor((segment_index as f64 / max_offset as f64) * 100.0) as i32; - if percentage > last_logged_percentage && percentage % 10 == 0 { - last_logged_percentage = percentage; - log::debug!( - "[{}] Loaded {}% ({}/{})", - address, - percentage, - transaction_offset, - max_offset - ); - } - } + let mut transaction_offset = 0; + let commit_log = message_log + .map(|mlog| { + log::debug!("[{}] Replaying transaction log.", address); + let mut last_logged_percentage = 0; + + let commit_log = CommitLog::new(mlog, odb); + let max_commit_offset = commit_log.max_commit_offset(); + + let commit_log = commit_log.replay(|commit, odb| { + transaction_offset += commit.transactions.len(); + for transaction in commit.transactions { + datastore.replay_transaction(&transaction, odb.clone())?; } - } - - // The purpose of this is to rebuild the state of the datastore - // after having inserted all of rows from the message log. - // This is necessary because, for example, inserting a row into `st_table` - // is not equivalent to calling `create_table`. - // There may eventually be better way to do this, but this will have to do for now. - datastore.rebuild_state_after_replay()?; - } - let commit_offset = if let Some(last_commit_offset) = last_commit_offset { - last_commit_offset + 1 - } else { - 0 - }; + let percentage = + f64::floor((commit.commit_offset as f64 / max_commit_offset as f64) * 100.0) as i32; + if percentage > last_logged_percentage && percentage % 10 == 0 { + last_logged_percentage = percentage; + log::debug!( + "[{}] Loaded {}% ({}/{})", + address, + percentage, + transaction_offset, + max_commit_offset + ); + } - log::debug!( - "[{}] Initialized with {} commits and tx offset {}", - address, - commit_offset, - transaction_offset - ); - - Commit { - parent_commit_hash: last_hash, - commit_offset, - min_tx_offset: transaction_offset, - transactions: Vec::new(), - } - }; - let commit_log = CommitLog::new(message_log, odb.clone(), unwritten_commit, fsync); + Ok(()) + })?; + + Ok::<_, DBError>(commit_log.with_fsync(fsync)) + }) + .transpose()?; + + // The purpose of this is to rebuild the state of the datastore + // after having inserted all of rows from the message log. + // This is necessary because, for example, inserting a row into `st_table` + // is not equivalent to calling `create_table`. + // There may eventually be better way to do this, but this will have to do for now. + datastore.rebuild_state_after_replay()?; + + log::debug!( + "[{}] Initialized with {} commits and tx offset {}", + address, + commit_log.as_ref().map(|log| log.commit_offset()).unwrap_or_default(), + transaction_offset + ); // i.e. essentially bootstrap the creation of the schema // tables by hard coding the schema of the schema tables @@ -254,8 +153,8 @@ impl RelationalDB { } /// Obtain a read-only view of this database's [`CommitLog`]. - pub fn commit_log(&self) -> CommitLogView { - CommitLogView::from(&self.commit_log) + pub fn commit_log(&self) -> Option { + self.commit_log.as_ref().map(CommitLog::from) } #[tracing::instrument(skip_all)] @@ -351,7 +250,12 @@ impl RelationalDB { pub fn commit_tx(&self, ctx: &ExecutionContext, tx: MutTxId) -> Result)>, DBError> { log::trace!("COMMIT TX"); if let Some(tx_data) = self.inner.commit_mut_tx(ctx, tx)? { - let bytes_written = self.commit_log.append_tx(ctx, &tx_data, &self.inner)?; + let bytes_written = self + .commit_log + .as_ref() + .map(|commit_log| commit_log.append_tx(ctx, &tx_data, &self.inner)) + .transpose()? + .flatten(); return Ok(Some((tx_data, bytes_written))); } Ok(None)