Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix panic when reusing a was-empty batch #267

Merged
merged 6 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning.
* Avoid leaving fractured write after failure by reseeking the file writer. Panic if the reseek fails as well.
* Fix panic when an empty batch is written to engine and then reused.

### New Features

Expand Down
66 changes: 43 additions & 23 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ where
/// bytes. If `sync` is true, the write will be followed by a call to
/// `fdatasync` on the log file.
pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
if log_batch.is_empty() {
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
debug_assert!(len > 0);
let block_handle = {
let mut writer = Writer::new(log_batch, sync);
// Snapshot and clear the current perf context temporarily, so the write group
Expand All @@ -151,16 +155,7 @@ where
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.mut_payload();
let res = if !log_batch.is_empty() {
self.pipe_log.append(LogQueue::Append, log_batch)
} else {
// TODO(tabokie): use Option<FileBlockHandle> instead.
Ok(FileBlockHandle {
id: FileId::new(LogQueue::Append, 0),
offset: 0,
len: 0,
})
};
let res = self.pipe_log.append(LogQueue::Append, log_batch);
writer.set_output(res);
}
perf_context!(log_write_duration).observe_since(now);
Expand All @@ -185,20 +180,17 @@ where
set_perf_context(perf_context);
writer.finish()?
};

let mut now = Instant::now();
if len > 0 {
log_batch.finish_write(block_handle);
self.memtables.apply_append_writes(log_batch.drain());
for listener in &self.listeners {
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
}
log_batch.finish_write(block_handle);
self.memtables.apply_append_writes(log_batch.drain());
for listener in &self.listeners {
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
Ok(len)
Expand Down Expand Up @@ -1393,6 +1385,34 @@ mod tests {
);
}

#[test]
fn test_empty_batch() {
let dir = tempfile::Builder::new()
.prefix("test_empty_batch")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 16];
let cases = [[false, false], [false, true], [true, true]];
for (i, writes) in cases.iter().enumerate() {
let rid = i as u64;
let mut batch = LogBatch::default();
for &has_data in writes {
if has_data {
batch.put(rid, b"key".to_vec(), data.clone());
}
engine.write(&mut batch, true).unwrap();
assert!(batch.is_empty());
}
}
}

#[test]
fn test_dirty_recovery() {
let dir = tempfile::Builder::new()
Expand Down