Skip to content

Commit

Permalink
split Snapshotter to Snapshotter and SnapshotRecoverer, remove …
Browse files Browse the repository at this point in the history
…the now unused `SnapshotBuilder`.
  • Loading branch information
Rachelint committed Apr 19, 2023
1 parent 564352e commit 8ff80ed
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 76 deletions.
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use crate::{
const MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ: usize = 64;

pub(crate) struct ManifestStorages {
pub(crate) wal_manager: WalManagerRef,
pub(crate) oss_storage: ObjectStoreRef,
pub wal_manager: WalManagerRef,
pub oss_storage: ObjectStoreRef,
}

impl Instance {
Expand Down
108 changes: 41 additions & 67 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,30 @@ impl TableSnapshotProvider for TableSnapshotProviderImpl {
}
}

/// Snapshot builder
#[async_trait]
trait SnapshotBuilder {
async fn build(&self) -> Result<Option<Snapshot>>;
}

/// Storage based snapshot builder
/// Snapshot recoverer
///
/// Usually, it will recover the snapshot from storage(like disk, oss, etc).
// TODO: remove `LogStore` and related operations, it should be called directly but not in the
// `SnapshotReoverer`.
#[derive(Debug, Clone)]
struct StorageSnapshotBuilder<LogStore, SnapshotStore> {
struct SnapshotRecoverer<LogStore, SnapshotStore> {
log_store: LogStore,
snapshot_store: SnapshotStore,
}

impl<LogStore, SnapshotStore> StorageSnapshotBuilder<LogStore, SnapshotStore>
impl<LogStore, SnapshotStore> SnapshotRecoverer<LogStore, SnapshotStore>
where
LogStore: MetaUpdateLogStore + Send + Sync,
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
{
async fn recover(&self) -> Result<Option<Snapshot>> {
// Load the current snapshot first.
match self.snapshot_store.load().await? {
Some(v) => Ok(Some(self.create_latest_snapshot_with_prev(v).await?)),
None => self.create_latest_snapshot_without_prev().await,
}
}

async fn create_latest_snapshot_with_prev(&self, prev_snapshot: Snapshot) -> Result<Snapshot> {
let log_start_boundary = ReadBoundary::Excluded(prev_snapshot.end_seq);
let mut reader = self.log_store.scan(log_start_boundary).await?;
Expand Down Expand Up @@ -279,37 +285,34 @@ where

let mut latest_seq = SequenceNumber::MIN;
let mut manifest_data_builder = TableManifestDataBuilder::default();
let mut has_logs = false;
while let Some((seq, update)) = reader.next_update().await? {
latest_seq = seq;
manifest_data_builder
.apply_update(update)
.context(ApplyUpdate)?;
has_logs = true;
}

Ok(Some(Snapshot {
end_seq: latest_seq,
data: manifest_data_builder.build(),
}))
}
}

#[async_trait]
impl<LogStore, SnapshotStore> SnapshotBuilder for StorageSnapshotBuilder<LogStore, SnapshotStore>
where
LogStore: MetaUpdateLogStore + Send + Sync,
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
{
async fn build(&self) -> Result<Option<Snapshot>> {
// Load the current snapshot first.
match self.snapshot_store.load().await? {
Some(v) => Ok(Some(self.create_latest_snapshot_with_prev(v).await?)),
None => self.create_latest_snapshot_without_prev().await,
if has_logs {
Ok(Some(Snapshot {
end_seq: latest_seq,
data: manifest_data_builder.build(),
}))
} else {
Ok(None)
}
}
}
/// Memory based snapshot builder

/// Snapshot creator
///
/// Usually, it will get snapshot from memory, and store them to storage(like
/// disk, oss, etc).
// TODO: remove `LogStore` and related operations, it should be called directly but not in the
// `Snapshotter`.
#[derive(Debug, Clone)]
struct MemorySnapshotBuilder<LogStore, SnapshotStore> {
struct Snapshotter<LogStore, SnapshotStore> {
log_store: LogStore,
snapshot_store: SnapshotStore,
end_seq: SequenceNumber,
Expand All @@ -318,14 +321,13 @@ struct MemorySnapshotBuilder<LogStore, SnapshotStore> {
table_id: TableId,
}

#[async_trait]
impl<LogStore, SnapshotStore> SnapshotBuilder for MemorySnapshotBuilder<LogStore, SnapshotStore>
impl<LogStore, SnapshotStore> Snapshotter<LogStore, SnapshotStore>
where
LogStore: MetaUpdateLogStore + Send + Sync,
SnapshotStore: MetaUpdateSnapshotStore + Send + Sync,
{
/// Create a latest snapshot of the current logs.
async fn build(&self) -> Result<Option<Snapshot>> {
async fn snapshot(&self) -> Result<Option<Snapshot>> {
// Get snapshot data from memory.
let table_snapshot_opt = self
.snapshot_data_provider
Expand Down Expand Up @@ -454,17 +456,15 @@ impl ManifestImpl {
let snapshot_store =
ObjectStoreBasedSnapshotStore::new(space_id, table_id, self.store.clone());
let end_seq = self.wal_manager.sequence_num(location).await.unwrap();
let memory_snapshot_builder = MemorySnapshotBuilder {
let snapshotter = Snapshotter {
log_store,
snapshot_store,
end_seq,
snapshot_data_provider: self.snap_data_provider.clone(),
space_id,
table_id,
};
let snapshotter = Snapshotter {
snapshot_builder: memory_snapshot_builder,
};

let snapshot = snapshotter.snapshot().await?.map(|v| {
self.decrease_num_updates();
v
Expand Down Expand Up @@ -524,14 +524,11 @@ impl Manifest for ManifestImpl {
load_req.table_id,
self.store.clone(),
);
let storage_snapshot_builder = StorageSnapshotBuilder {
let reoverer = SnapshotRecoverer {
log_store,
snapshot_store,
};
let snapshotter = Snapshotter {
snapshot_builder: storage_snapshot_builder,
};
let snapshot = snapshotter.snapshot().await?;
let snapshot = reoverer.recover().await?;

Ok(snapshot.and_then(|v| v.data))
}
Expand Down Expand Up @@ -708,11 +705,6 @@ impl MetaUpdateLogStore for WalBasedLogStore {
}
}

#[derive(Debug, Clone)]
struct Snapshotter<Builder> {
snapshot_builder: Builder,
}

/// The snapshot for the current logs.
#[derive(Debug, Clone, PartialEq)]
struct Snapshot {
Expand Down Expand Up @@ -791,18 +783,6 @@ impl From<Snapshot> for manifest_pb::Snapshot {
}
}

impl<Builder> Snapshotter<Builder>
where
Builder: SnapshotBuilder + Send + Sync,
{
/// Do snapshot for the current logs including:
/// - saving the snapshot.
/// - deleting the expired logs.
pub async fn snapshot(&self) -> Result<Option<Snapshot>> {
self.snapshot_builder.build().await
}
}

#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::Arc, vec};
Expand Down Expand Up @@ -1529,32 +1509,26 @@ mod tests {
table_id: TableId,
) -> Option<Snapshot> {
let end_seq = log_store.next_seq() - 1;
let memory_builder = MemorySnapshotBuilder {
let snapshotter = Snapshotter {
log_store: log_store.clone(),
snapshot_store: snapshot_store.clone(),
end_seq,
snapshot_data_provider: snapshot_provider,
space_id: 0,
table_id,
};
let snapshotter = Snapshotter {
snapshot_builder: memory_builder,
};
snapshotter.snapshot().await.unwrap()
}

async fn recover_snapshot(
log_store: &MemLogStore,
snapshot_store: &MemSnapshotStore,
) -> Option<Snapshot> {
let storage_builder = StorageSnapshotBuilder {
let recoverer = SnapshotRecoverer {
log_store: log_store.clone(),
snapshot_store: snapshot_store.clone(),
};
let snapshotter = Snapshotter {
snapshot_builder: storage_builder,
};
snapshotter.snapshot().await.unwrap()
recoverer.recover().await.unwrap()
}

#[test]
Expand Down
7 changes: 0 additions & 7 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ pub struct EngineBuilder<'a> {
impl<'a> EngineBuilder<'a> {
pub async fn build(self) -> Result<TableEngineRef> {
let opened_storages = open_storage(self.config.storage.clone()).await?;
// let manifest = ManifestImpl::open(
// self.config.manifest.clone(),
// self.opened_wals.manifest_wal.clone(),
// opened_storages.default_store().clone(),
// )
// .await
// .context(OpenManifest)?;
let manifest_storages = ManifestStorages {
wal_manager: self.opened_wals.manifest_wal.clone(),
oss_storage: opened_storages.default_store().clone(),
Expand Down

0 comments on commit 8ff80ed

Please sign in to comment.