Skip to content

Commit

Permalink
fix: persist sst id (#1009)
Browse files Browse the repository at this point in the history
## Rationale
part of #990 

## Detailed Changes
- Abstract a `IdAllocator` in `common_util` library
- Use `IdAllocator` to alloc sst id
- Persist `IdAllocator`'s `max id` to manifest

## Test Plan
- Add new unit test for `IdAllocator`
- Manual compatibility test: make ssts generated with old
ceresdb-server, and deploy a new ceresdb-server with this changeset.
Write some new data into it to make new ssts generated, and check
whether the sst id is correct.
  • Loading branch information
baojinri authored Jul 7, 2023
1 parent c6c306d commit ed63767
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 50 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
writer::{MetaData, RecordBatchStream},
},
table::{
data::{TableData, TableDataRef},
data::{self, TableData, TableDataRef},
version::{FlushableMemTables, MemTableState, SamplingMemTable},
version_edit::{AddFile, DeleteFile},
},
Expand Down Expand Up @@ -142,6 +142,9 @@ pub enum Error {
msg: Option<String>,
backtrace: Backtrace,
},

#[snafu(display("Failed to alloc file id, err:{}", source))]
AllocFileId { source: data::Error },
}

define_result!(Error);
Expand Down Expand Up @@ -408,6 +411,7 @@ impl FlushTask {
files_to_add: files_to_level0.clone(),
files_to_delete: vec![],
mems_to_remove: mems_to_flush.ids(),
max_file_id: 0,
};
let meta_update = MetaUpdate::VersionEdit(edit_meta);
MetaEditRequest {
Expand Down Expand Up @@ -476,7 +480,12 @@ impl FlushTask {
for time_range in &time_ranges {
let (batch_record_sender, batch_record_receiver) =
channel::<Result<RecordBatchWithKey>>(DEFAULT_CHANNEL_SIZE);
let file_id = self.table_data.alloc_file_id();
let file_id = self
.table_data
.alloc_file_id(&self.space_store.manifest)
.await
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

// TODO: `min_key` & `max_key` should be figured out when writing sst.
Expand Down Expand Up @@ -596,7 +605,12 @@ impl FlushTask {
};

// Alloc file id for next sst file
let file_id = self.table_data.alloc_file_id();
let file_id = self
.table_data
.alloc_file_id(&self.space_store.manifest)
.await
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

let storage_format_hint = self.table_data.table_options().storage_format_hint;
Expand Down Expand Up @@ -669,6 +683,7 @@ impl SpaceStore {
files_to_add: Vec::with_capacity(inputs.len()),
files_to_delete: vec![],
mems_to_remove: vec![],
max_file_id: 0,
};

if task.is_empty() {
Expand Down Expand Up @@ -832,7 +847,11 @@ impl SpaceStore {
};

// Alloc file id for the merged sst.
let file_id = table_data.alloc_file_id();
let file_id = table_data
.alloc_file_id(&self.manifest)
.await
.context(AllocFileId)?;

let sst_file_path = table_data.set_sst_file_path(file_id);

let mut sst_writer = self
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ mod tests {
files_to_add: vec![],
files_to_delete: vec![],
mems_to_remove: vec![],
max_file_id: 0,
})
}

Expand Down
6 changes: 6 additions & 0 deletions analytic_engine/src/manifest/meta_edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use wal::log_batch::{Payload, PayloadDecoder};
use crate::{
manifest::meta_snapshot::MetaSnapshot,
space::SpaceId,
sst::manager::FileId,
table::{
data::{MemTableId, TableShardInfo},
version::TableVersionMeta,
Expand Down Expand Up @@ -230,6 +231,7 @@ pub struct VersionEditMeta {
/// Id of memtables to remove from immutable memtable lists.
/// No need to persist.
pub mems_to_remove: Vec<MemTableId>,
pub max_file_id: FileId,
}

impl VersionEditMeta {
Expand All @@ -241,6 +243,7 @@ impl VersionEditMeta {
flushed_sequence: self.flushed_sequence,
files_to_add: self.files_to_add,
files_to_delete: self.files_to_delete,
max_file_id: self.max_file_id,
}
}
}
Expand All @@ -259,6 +262,7 @@ impl From<VersionEditMeta> for manifest_pb::VersionEditMeta {
flushed_sequence: v.flushed_sequence,
files_to_add,
files_to_delete,
max_file_id: v.max_file_id,
}
}
}
Expand All @@ -284,6 +288,7 @@ impl TryFrom<manifest_pb::VersionEditMeta> for VersionEditMeta {
files_to_add,
files_to_delete,
mems_to_remove: Vec::default(),
max_file_id: src.max_file_id,
})
}
}
Expand Down Expand Up @@ -451,6 +456,7 @@ impl From<Snapshot> for manifest_pb::Snapshot {
files_to_add: version_meta.ordered_files(),
files_to_delete: vec![],
mems_to_remove: vec![],
max_file_id: version_meta.max_file_id,
});
(
table_meta,
Expand Down
76 changes: 53 additions & 23 deletions analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ use common_types::{
time::{TimeRange, Timestamp},
SequenceNumber,
};
use common_util::define_result;
use common_util::{
define_result,
error::{GenericError, GenericResult},
id_allocator::IdAllocator,
};
use log::{debug, info};
use object_store::Path;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::TableId;

use crate::{
instance::serial_executor::TableOpSerialExecutor,
manifest::meta_edit::AddTableMeta,
manifest::{
meta_edit::{AddTableMeta, MetaEdit, MetaEditRequest, MetaUpdate, VersionEditMeta},
ManifestRef,
},
memtable::{
factory::{FactoryRef as MemTableFactoryRef, Options as MemTableOptions},
skiplist::factory::SkiplistMemTableFactory,
Expand Down Expand Up @@ -69,12 +76,17 @@ pub enum Error {
FindMemTable {
source: crate::table::version::Error,
},

#[snafu(display("Failed to alloc file id, err:{}", source))]
AllocFileId { source: GenericError },
}

define_result!(Error);

pub type MemTableId = u64;

pub const DEFAULT_ALLOC_STEP: u64 = 100;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TableShardInfo {
pub shard_id: ShardId,
Expand Down Expand Up @@ -128,10 +140,8 @@ pub struct TableData {
/// Allocating memtable id should be guarded by write lock
last_memtable_id: AtomicU64,

/// Last id of the sst file
///
/// Write to last_file_id require external synchronization
last_file_id: AtomicU64,
/// Allocating file id
allocator: IdAllocator,

/// Last flush time
///
Expand Down Expand Up @@ -163,7 +173,6 @@ impl fmt::Debug for TableData {
.field("opts", &self.opts)
.field("last_sequence", &self.last_sequence)
.field("last_memtable_id", &self.last_memtable_id)
.field("last_file_id", &self.last_file_id)
.field("dropped", &self.dropped.load(Ordering::Relaxed))
.field("shard_info", &self.shard_info)
.finish()
Expand Down Expand Up @@ -230,7 +239,7 @@ impl TableData {
current_version,
last_sequence: AtomicU64::new(0),
last_memtable_id: AtomicU64::new(0),
last_file_id: AtomicU64::new(0),
allocator: IdAllocator::new(0, 0, DEFAULT_ALLOC_STEP),
last_flush_time_ms: AtomicU64::new(0),
dropped: AtomicBool::new(false),
metrics,
Expand All @@ -248,6 +257,7 @@ impl TableData {
shard_id: ShardId,
preflush_write_buffer_size_ratio: f32,
mem_usage_collector: CollectorRef,
allocator: IdAllocator,
) -> Result<Self> {
let memtable_factory = Arc::new(SkiplistMemTableFactory);
let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id);
Expand All @@ -271,7 +281,7 @@ impl TableData {
current_version,
last_sequence: AtomicU64::new(0),
last_memtable_id: AtomicU64::new(0),
last_file_id: AtomicU64::new(0),
allocator,
last_flush_time_ms: AtomicU64::new(0),
dropped: AtomicBool::new(false),
metrics,
Expand Down Expand Up @@ -480,22 +490,43 @@ impl TableData {
should_flush
}

/// Set `last_file_id`, mainly used in recover
///
/// This operation require external synchronization
pub fn set_last_file_id(&self, last_file_id: FileId) {
self.last_file_id.store(last_file_id, Ordering::Relaxed);
}
/// Use allocator to alloc a file id for a new file.
pub async fn alloc_file_id(&self, manifest: &ManifestRef) -> Result<FileId> {
// Persist next max file id to manifest.
let persist_max_file_id = move |next_max_file_id| async move {
self.persist_max_file_id(manifest, next_max_file_id).await
};

/// Returns the last file id
pub fn last_file_id(&self) -> FileId {
self.last_file_id.load(Ordering::Relaxed)
self.allocator
.alloc_id(persist_max_file_id)
.await
.context(AllocFileId)
}

/// Alloc a file id for a new file
pub fn alloc_file_id(&self) -> FileId {
let last = self.last_file_id.fetch_add(1, Ordering::Relaxed);
last + 1
async fn persist_max_file_id(
&self,
manifest: &ManifestRef,
next_max_file_id: FileId,
) -> GenericResult<()> {
let manifest_update = VersionEditMeta {
space_id: self.space_id,
table_id: self.id,
flushed_sequence: 0,
files_to_add: vec![],
files_to_delete: vec![],
mems_to_remove: vec![],
max_file_id: next_max_file_id,
};
let edit_req = {
let meta_update = MetaUpdate::VersionEdit(manifest_update);
MetaEditRequest {
shard_info: self.shard_info,
meta_edit: MetaEdit::Update(meta_update),
}
};
// table version's max file id will be update when apply this meta update.
manifest.apply_edit(edit_req).await?;
Ok(())
}

/// Set the sst file path into the object storage path.
Expand Down Expand Up @@ -746,7 +777,6 @@ pub mod tests {
assert_eq!(TableShardInfo::new(shard_id), table_data.shard_info);
assert_eq!(0, table_data.last_sequence());
assert!(!table_data.is_dropped());
assert_eq!(0, table_data.last_file_id());
assert_eq!(0, table_data.last_memtable_id());
assert!(table_data.dedup());
}
Expand Down
22 changes: 21 additions & 1 deletion analytic_engine/src/table/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
manager::{FileId, LevelsController},
},
table::{
data::MemTableId,
data::{MemTableId, DEFAULT_ALLOC_STEP},
version_edit::{AddFile, VersionEdit},
},
};
Expand Down Expand Up @@ -520,6 +520,12 @@ struct TableVersionInner {
/// The earliest sequence number of the entries already flushed (inclusive).
/// All log entry with sequence <= `flushed_sequence` can be deleted
flushed_sequence: SequenceNumber,
/// Max id of the sst file.
///
/// The id is allocated by step, so there are some still unused ids smaller
/// than the max one. And this field is only a mem state for Manifest,
/// it can only be updated during recover or by Manifest.
max_file_id: FileId,
}

impl TableVersionInner {
Expand Down Expand Up @@ -558,6 +564,7 @@ impl TableVersion {
memtable_view: MemTableView::new(),
levels_controller: LevelsController::new(purge_queue),
flushed_sequence: 0,
max_file_id: 0,
}),
}
}
Expand Down Expand Up @@ -672,6 +679,8 @@ impl TableVersion {
// TODO(yingwen): else, log warning
inner.flushed_sequence = cmp::max(inner.flushed_sequence, edit.flushed_sequence);

inner.max_file_id = cmp::max(inner.max_file_id, edit.max_file_id);

// Add sst files to level first.
for add_file in edit.files_to_add {
inner
Expand All @@ -698,6 +707,8 @@ impl TableVersion {

inner.flushed_sequence = cmp::max(inner.flushed_sequence, meta.flushed_sequence);

inner.max_file_id = cmp::max(inner.max_file_id, meta.max_file_id);

for add_file in meta.files.into_values() {
inner
.levels_controller
Expand Down Expand Up @@ -782,13 +793,15 @@ impl TableVersion {
TableVersionSnapshot {
flushed_sequence: inner.flushed_sequence,
files,
max_file_id: inner.max_file_id,
}
}
}

pub struct TableVersionSnapshot {
pub flushed_sequence: SequenceNumber,
pub files: HashMap<FileId, AddFile>,
pub max_file_id: FileId,
}

/// During recovery, we apply all version edit to [TableVersionMeta] first, then
Expand All @@ -811,6 +824,12 @@ impl TableVersionMeta {
self.files.insert(add_file.file.id, add_file);
}

self.max_file_id = cmp::max(self.max_file_id, edit.max_file_id);

// aligned max file id.
self.max_file_id =
(self.max_file_id + DEFAULT_ALLOC_STEP - 1) / DEFAULT_ALLOC_STEP * DEFAULT_ALLOC_STEP;

for delete_file in edit.files_to_delete {
self.files.remove(&delete_file.file_id);
}
Expand Down Expand Up @@ -1103,6 +1122,7 @@ mod tests {
mems_to_remove: vec![memtable_id1, memtable_id2],
files_to_add: vec![add_file],
files_to_delete: vec![],
max_file_id: 0,
};
version.apply_edit(edit);

Expand Down
Loading

0 comments on commit ed63767

Please sign in to comment.