Skip to content

Commit

Permalink
Switch record_payloads to bytes::Buf (#31)
Browse files Browse the repository at this point in the history
* Switch record_payloads to bytes::Buf

Switches record_payloads internface from using &'b [u8] to bytes::Buf.

Closes #29

* Update src/record.rs

Better check for remaining

Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>

---------

Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
  • Loading branch information
imotov and trinity-1686a authored Mar 1, 2023
1 parent 1c864ed commit 38307b3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tokio = {version="1", features=["io-util", "macros", "rt-multi-thread", "fs", "i
async-trait = "0.1"
serde = {version= "1", features=["derive"]}
serde_json = {version= "1"}
bytes = {version = "1"}

[dev-dependencies]
tempfile = "3"
Expand Down
6 changes: 4 additions & 2 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::ops::RangeBounds;
use std::path::Path;
use std::time::{Duration, Instant};

use bytes::Buf;

use crate::error::{
AppendError, CreateQueueError, DeleteQueueError, ReadRecordError, TruncateError,
};
Expand Down Expand Up @@ -174,7 +176,7 @@ impl MultiRecordLog {
&mut self,
queue: &str,
position_opt: Option<u64>,
payload: &[u8],
payload: impl Buf,
) -> Result<Option<u64>, AppendError> {
self.append_records(queue, position_opt, std::iter::once(payload))
.await
Expand All @@ -186,7 +188,7 @@ impl MultiRecordLog {
/// However this function succeeding does not necessarily means records where stored, be sure
/// to call [`Self::sync`] to make sure changes are persisted if you don't use
/// [`SyncPolicy::OnAppend`] (which is the default).
pub async fn append_records<'a, T: Iterator<Item = &'a [u8]>>(
pub async fn append_records<'a, T: Iterator<Item = impl Buf>>(
&mut self,
queue: &str,
position_opt: Option<u64>,
Expand Down
6 changes: 3 additions & 3 deletions src/proptests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ async fn test_multi_record() {
multi_record_log.create_queue("queue").await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"1")
.append_record("queue", None, &b"1"[..])
.await
.unwrap(),
Some(0)
Expand All @@ -314,7 +314,7 @@ async fn test_multi_record() {
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"22")
.append_record("queue", None, &b"22"[..])
.await
.unwrap(),
Some(1)
Expand All @@ -335,7 +335,7 @@ async fn test_multi_record() {
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"hello")
.append_record("queue", None, &b"hello"[..])
.await
.unwrap(),
Some(2)
Expand Down
21 changes: 14 additions & 7 deletions src/record.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::convert::{TryFrom, TryInto};

use bytes::Buf;

use crate::error::MultiRecordCorruption;
use crate::Serializable;

Expand Down Expand Up @@ -141,25 +143,30 @@ impl<'a> MultiRecord<'a> {
}
}

pub fn serialize<'b, T: Iterator<Item = &'b [u8]>>(
pub fn serialize<T: Iterator<Item = impl Buf>>(
record_payloads: T,
position: u64,
output: &mut Vec<u8>,
) {
Self::serialize_with_pos((position..).zip(record_payloads), output);
}

fn serialize_with_pos<'b>(
record_payloads: impl Iterator<Item = (u64, &'b [u8])>,
fn serialize_with_pos(
record_payloads: impl Iterator<Item = (u64, impl Buf)>,
output: &mut Vec<u8>,
) {
output.clear();
for (position, record_payload) in record_payloads {
assert!(record_payload.len() <= u32::MAX as usize);
for (position, mut record_payload) in record_payloads {
assert!(record_payload.remaining() <= u32::MAX as usize);
// TODO add assert for position monotonicity?
let record_payload = &mut record_payload;
output.extend_from_slice(&position.to_le_bytes());
output.extend_from_slice(&(record_payload.len() as u32).to_le_bytes());
output.extend_from_slice(record_payload);
output.extend_from_slice(&(record_payload.remaining() as u32).to_le_bytes());
while record_payload.has_remaining() {
let chunk = record_payload.chunk();
output.extend_from_slice(record_payload.chunk());
record_payload.advance(chunk.len());
}
}
}

Expand Down
66 changes: 49 additions & 17 deletions src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::borrow::Cow;

use bytes::Buf;

use crate::MultiRecordLog;

fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Vec<Cow<'a, [u8]>> {
Expand Down Expand Up @@ -49,11 +51,11 @@ async fn test_multi_record_log_simple() {
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log.create_queue("queue").await.unwrap();
multi_record_log
.append_record("queue", None, b"hello")
.append_record("queue", None, &b"hello"[..])
.await
.unwrap();
multi_record_log
.append_record("queue", None, b"happy")
.append_record("queue", None, &b"happy"[..])
.await
.unwrap();
assert_eq!(
Expand All @@ -64,18 +66,48 @@ async fn test_multi_record_log_simple() {
}
}

#[tokio::test]
async fn test_multi_record_log_chained() {
let tempdir = tempfile::tempdir().unwrap();
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log.create_queue("queue").await.unwrap();
multi_record_log
.append_record(
"queue",
None,
b"world".chain(&b" "[..]).chain(&b"order"[..]),
)
.await
.unwrap();
multi_record_log
.append_record(
"queue",
None,
b"nice"[..].chain(&b" "[..]).chain(&b"day"[..]),
)
.await
.unwrap();
assert_eq!(
&read_all_records(&multi_record_log, "queue"),
&[b"world order".as_slice(), b"nice day".as_slice()]
);
assert_eq!(&multi_record_log.list_file_numbers(), &[0]);
}
}

#[tokio::test]
async fn test_multi_record_log_reopen() {
let tempdir = tempfile::tempdir().unwrap();
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log.create_queue("queue").await.unwrap();
multi_record_log
.append_record("queue", None, b"hello")
.append_record("queue", None, &b"hello"[..])
.await
.unwrap();
multi_record_log
.append_record("queue", None, b"happy")
.append_record("queue", None, &b"happy"[..])
.await
.unwrap();
}
Expand All @@ -97,23 +129,23 @@ async fn test_multi_record_log() {
multi_record_log.create_queue("queue1").await.unwrap();
multi_record_log.create_queue("queue2").await.unwrap();
multi_record_log
.append_record("queue1", None, b"hello")
.append_record("queue1", None, &b"hello"[..])
.await
.unwrap();
multi_record_log
.append_record("queue2", None, b"maitre")
.append_record("queue2", None, &b"maitre"[..])
.await
.unwrap();
multi_record_log
.append_record("queue1", None, b"happy")
.append_record("queue1", None, &b"happy"[..])
.await
.unwrap();
multi_record_log
.append_record("queue1", None, b"tax")
.append_record("queue1", None, &b"tax"[..])
.await
.unwrap();
multi_record_log
.append_record("queue2", None, b"corbeau")
.append_record("queue2", None, &b"corbeau"[..])
.await
.unwrap();
assert_eq!(
Expand All @@ -129,7 +161,7 @@ async fn test_multi_record_log() {
{
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
multi_record_log
.append_record("queue1", None, b"bubu")
.append_record("queue1", None, &b"bubu"[..])
.await
.unwrap();
assert_eq!(
Expand All @@ -153,7 +185,7 @@ async fn test_multi_record_position_known_after_truncate() {
multi_record_log.create_queue("queue").await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"1")
.append_record("queue", None, &b"1"[..])
.await
.unwrap(),
Some(0)
Expand All @@ -163,7 +195,7 @@ async fn test_multi_record_position_known_after_truncate() {
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"2")
.append_record("queue", None, &b"2"[..])
.await
.unwrap(),
Some(1)
Expand All @@ -179,7 +211,7 @@ async fn test_multi_record_position_known_after_truncate() {
let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"hello")
.append_record("queue", None, &b"hello"[..])
.await
.unwrap(),
Some(2)
Expand Down Expand Up @@ -258,22 +290,22 @@ async fn test_truncate_range_correct_pos() {
multi_record_log.create_queue("queue").await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"1")
.append_record("queue", None, &b"1"[..])
.await
.unwrap(),
Some(0)
);
assert_eq!(
multi_record_log
.append_record("queue", None, b"2")
.append_record("queue", None, &b"2"[..])
.await
.unwrap(),
Some(1)
);
multi_record_log.truncate("queue", 1).await.unwrap();
assert_eq!(
multi_record_log
.append_record("queue", None, b"3")
.append_record("queue", None, &b"3"[..])
.await
.unwrap(),
Some(2)
Expand Down Expand Up @@ -315,7 +347,7 @@ async fn test_multi_record_size() {
let size_mem_create = multi_record_log.in_memory_size();
assert!(size_mem_create > 0);
multi_record_log
.append_record("queue", None, b"hello")
.append_record("queue", None, &b"hello"[..])
.await
.unwrap();
let size_mem_append = multi_record_log.in_memory_size();
Expand Down

0 comments on commit 38307b3

Please sign in to comment.