Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix append recovery bug #251

Merged
merged 2 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 82 additions & 24 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,28 +408,25 @@ impl<A: AllocatorTrait> MemTable<A> {
}
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
/// Appends some entries from append queue. Assumes this table has no
/// rewrite data.
///
/// This method is only used for recovery.
pub fn append_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
pub fn replay_append(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
debug_assert_eq!(self.rewrite_count, 0);
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
false, /* allow_hole */
// Refer to case in `merge_newer_neighbor`.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
self.global_stats.add(LogQueue::Append, len);
for ei in &entry_indexes {
debug_assert_eq!(ei.entries.unwrap().id.queue, LogQueue::Append);
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

Expand Down Expand Up @@ -507,6 +504,31 @@ impl<A: AllocatorTrait> MemTable<A> {
self.rewrite_count = pos + rewrite_len;
}

/// Appends some entries from rewrite queue. Assumes this table has no
/// append data.
///
/// This method is only used for recovery.
pub fn replay_rewrite(&mut self, entry_indexes: Vec<EntryIndex>) {
let len = entry_indexes.len();
if len > 0 {
debug_assert_eq!(self.rewrite_count, self.entry_indexes.len());
self.prepare_append(
entry_indexes[0].index,
// Rewrite -> Compact Append -> Rewrite.
true, /* allow_hole */
// Refer to case in `merge_append_table`. They can be adapted
// to attack this path via a global rewrite without deleting
// obsolete rewrite files.
true, /* allow_overwrite */
);
self.global_stats.add(LogQueue::Rewrite, len);
for ei in &entry_indexes {
self.entry_indexes.push_back(ei.into());
}
self.rewrite_count = self.entry_indexes.len();
}
}

/// Removes all entries with index smaller than `index`. Returns the number
/// of deleted entries.
pub fn compact_to(&mut self, index: u64) -> u64 {
Expand Down Expand Up @@ -1059,6 +1081,38 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
}
}

/// Applies changes from log items that are replayed from a append queue.
/// Assumes it haven't applied any rewrite data.
///
/// This method is only used for recovery.
pub fn replay_append_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().replay_append(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
self.remove(raft, true /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.write().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let value = kv.value.unwrap();
memtable.write().put(kv.key, value, kv.file_id.unwrap());
}
OpType::Del => {
let key = kv.key;
memtable.write().delete(key.as_slice());
}
},
}
}
}

/// Applies changes from log items that have been written to rewrite queue.
pub fn apply_rewrite_writes(
&self,
Expand Down Expand Up @@ -1090,15 +1144,16 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
/// Assumes it haven't applied any append data.
///
/// This method is only used for recovery.
pub fn apply_replayed_rewrite_writes(&self, log_items: LogItemDrain) {
pub fn replay_rewrite_writes(&self, log_items: LogItemDrain) {
for item in log_items {
let raft = item.raft_group_id;
let memtable = self.get_or_insert(raft);
match item.content {
LogItemContent::EntryIndexes(entries_to_add) => {
memtable.write().append_rewrite(entries_to_add.0);
memtable.write().replay_rewrite(entries_to_add.0);
}
LogItemContent::Command(Command::Clean) => {
// Only append tombstone needs to be recorded.
self.remove(raft, false /* record_tombstone */);
}
LogItemContent::Command(Command::Compact { index }) => {
Expand Down Expand Up @@ -1184,21 +1239,17 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
}
}
match file_id.queue {
LogQueue::Append => self.memtables.apply_append_writes(item_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(item_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(item_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(item_batch.drain()),
}
Ok(())
}

fn merge(&mut self, mut rhs: Self, queue: LogQueue) -> Result<()> {
self.log_batch.merge(&mut rhs.log_batch.clone());
match queue {
LogQueue::Append => self.memtables.apply_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self
.memtables
.apply_replayed_rewrite_writes(rhs.log_batch.drain()),
LogQueue::Append => self.memtables.replay_append_writes(rhs.log_batch.drain()),
LogQueue::Rewrite => self.memtables.replay_rewrite_writes(rhs.log_batch.drain()),
}
self.memtables.merge_newer_neighbor(rhs.memtables);
Ok(())
Expand Down Expand Up @@ -2045,7 +2096,7 @@ mod tests {
memtable.compact_to(7);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2087,7 +2138,7 @@ mod tests {
memtable.compact_to(10);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
7,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2138,7 +2189,7 @@ mod tests {
memtable.merge_newer_neighbor(&mut m1);
}
Some(LogQueue::Rewrite) => {
memtable.append_rewrite(generate_entry_indexes(
memtable.replay_rewrite(generate_entry_indexes(
0,
10,
FileId::new(LogQueue::Rewrite, 1),
Expand Down Expand Up @@ -2220,6 +2271,13 @@ mod tests {
batches[1].add_command(last_rid, Command::Compact { index: 5 });
batches[2].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[2]));

// entries [1, 10] => entries [11, 20][5, 10] => compact 8
last_rid += 1;
batches[0].add_entry_indexes(last_rid, generate_entry_indexes(1, 11, files[0]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(11, 21, files[1]));
batches[1].add_entry_indexes(last_rid, generate_entry_indexes(5, 11, files[1]));
batches[2].add_command(last_rid, Command::Compact { index: 8 });

for b in batches.iter_mut() {
b.finish_write(FileBlockHandle::dummy(LogQueue::Append));
}
Expand Down
4 changes: 4 additions & 0 deletions tests/failpoints/test_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,11 @@ fn test_concurrent_write_perf_context() {
}
}

// FIXME: this test no longer works because recovery cannot reliably detect
// overwrite anomaly.
// See https://github.com/tikv/raft-engine/issues/250
#[test]
#[should_panic]
fn test_recycle_with_stale_logbatch_at_tail() {
let dir = tempfile::Builder::new()
.prefix("test_recycle_with_stale_log_batch_at_tail")
Expand Down