diff --git a/Cargo.toml b/Cargo.toml index c748e73..255d8a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ tokio = {version="1", features=["io-util", "macros", "rt-multi-thread", "fs", "i async-trait = "0.1" serde = {version= "1", features=["derive"]} serde_json = {version= "1"} +bytes = {version = "1"} [dev-dependencies] tempfile = "3" diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 7e49c65..14f54c2 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -4,6 +4,8 @@ use std::ops::RangeBounds; use std::path::Path; use std::time::{Duration, Instant}; +use bytes::Buf; + use crate::error::{ AppendError, CreateQueueError, DeleteQueueError, ReadRecordError, TruncateError, }; @@ -174,7 +176,7 @@ impl MultiRecordLog { &mut self, queue: &str, position_opt: Option, - payload: &[u8], + payload: impl Buf, ) -> Result, AppendError> { self.append_records(queue, position_opt, std::iter::once(payload)) .await @@ -186,7 +188,7 @@ impl MultiRecordLog { /// However this function succeeding does not necessarily means records where stored, be sure /// to call [`Self::sync`] to make sure changes are persisted if you don't use /// [`SyncPolicy::OnAppend`] (which is the default). - pub async fn append_records<'a, T: Iterator>( + pub async fn append_records<'a, T: Iterator>( &mut self, queue: &str, position_opt: Option, diff --git a/src/proptests.rs b/src/proptests.rs index 5fa798a..f665494 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -304,7 +304,7 @@ async fn test_multi_record() { multi_record_log.create_queue("queue").await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"1") + .append_record("queue", None, &b"1"[..]) .await .unwrap(), Some(0) @@ -314,7 +314,7 @@ async fn test_multi_record() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"22") + .append_record("queue", None, &b"22"[..]) .await .unwrap(), Some(1) @@ -335,7 +335,7 @@ async fn test_multi_record() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"hello") + .append_record("queue", None, &b"hello"[..]) .await .unwrap(), Some(2) diff --git a/src/record.rs b/src/record.rs index d00da46..9ebe94c 100644 --- a/src/record.rs +++ b/src/record.rs @@ -1,5 +1,7 @@ use std::convert::{TryFrom, TryInto}; +use bytes::Buf; + use crate::error::MultiRecordCorruption; use crate::Serializable; @@ -141,7 +143,7 @@ impl<'a> MultiRecord<'a> { } } - pub fn serialize<'b, T: Iterator>( + pub fn serialize>( record_payloads: T, position: u64, output: &mut Vec, @@ -149,17 +151,22 @@ impl<'a> MultiRecord<'a> { Self::serialize_with_pos((position..).zip(record_payloads), output); } - fn serialize_with_pos<'b>( - record_payloads: impl Iterator, + fn serialize_with_pos( + record_payloads: impl Iterator, output: &mut Vec, ) { output.clear(); - for (position, record_payload) in record_payloads { - assert!(record_payload.len() <= u32::MAX as usize); + for (position, mut record_payload) in record_payloads { + assert!(record_payload.remaining() <= u32::MAX as usize); // TODO add assert for position monotonicity? + let record_payload = &mut record_payload; output.extend_from_slice(&position.to_le_bytes()); - output.extend_from_slice(&(record_payload.len() as u32).to_le_bytes()); - output.extend_from_slice(record_payload); + output.extend_from_slice(&(record_payload.remaining() as u32).to_le_bytes()); + while record_payload.remaining() > 0 { + let chunk = record_payload.chunk(); + output.extend_from_slice(record_payload.chunk()); + record_payload.advance(chunk.len()); + } } } diff --git a/src/tests.rs b/src/tests.rs index 253aec5..99dff6e 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,5 +1,7 @@ use std::borrow::Cow; +use bytes::Buf; + use crate::MultiRecordLog; fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Vec> { @@ -49,11 +51,11 @@ async fn test_multi_record_log_simple() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); multi_record_log.create_queue("queue").await.unwrap(); multi_record_log - .append_record("queue", None, b"hello") + .append_record("queue", None, &b"hello"[..]) .await .unwrap(); multi_record_log - .append_record("queue", None, b"happy") + .append_record("queue", None, &b"happy"[..]) .await .unwrap(); assert_eq!( @@ -64,6 +66,36 @@ async fn test_multi_record_log_simple() { } } +#[tokio::test] +async fn test_multi_record_log_chained() { + let tempdir = tempfile::tempdir().unwrap(); + { + let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); + multi_record_log.create_queue("queue").await.unwrap(); + multi_record_log + .append_record( + "queue", + None, + b"world".chain(&b" "[..]).chain(&b"order"[..]), + ) + .await + .unwrap(); + multi_record_log + .append_record( + "queue", + None, + b"nice"[..].chain(&b" "[..]).chain(&b"day"[..]), + ) + .await + .unwrap(); + assert_eq!( + &read_all_records(&multi_record_log, "queue"), + &[b"world order".as_slice(), b"nice day".as_slice()] + ); + assert_eq!(&multi_record_log.list_file_numbers(), &[0]); + } +} + #[tokio::test] async fn test_multi_record_log_reopen() { let tempdir = tempfile::tempdir().unwrap(); @@ -71,11 +103,11 @@ async fn test_multi_record_log_reopen() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); multi_record_log.create_queue("queue").await.unwrap(); multi_record_log - .append_record("queue", None, b"hello") + .append_record("queue", None, &b"hello"[..]) .await .unwrap(); multi_record_log - .append_record("queue", None, b"happy") + .append_record("queue", None, &b"happy"[..]) .await .unwrap(); } @@ -97,23 +129,23 @@ async fn test_multi_record_log() { multi_record_log.create_queue("queue1").await.unwrap(); multi_record_log.create_queue("queue2").await.unwrap(); multi_record_log - .append_record("queue1", None, b"hello") + .append_record("queue1", None, &b"hello"[..]) .await .unwrap(); multi_record_log - .append_record("queue2", None, b"maitre") + .append_record("queue2", None, &b"maitre"[..]) .await .unwrap(); multi_record_log - .append_record("queue1", None, b"happy") + .append_record("queue1", None, &b"happy"[..]) .await .unwrap(); multi_record_log - .append_record("queue1", None, b"tax") + .append_record("queue1", None, &b"tax"[..]) .await .unwrap(); multi_record_log - .append_record("queue2", None, b"corbeau") + .append_record("queue2", None, &b"corbeau"[..]) .await .unwrap(); assert_eq!( @@ -129,7 +161,7 @@ async fn test_multi_record_log() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); multi_record_log - .append_record("queue1", None, b"bubu") + .append_record("queue1", None, &b"bubu"[..]) .await .unwrap(); assert_eq!( @@ -153,7 +185,7 @@ async fn test_multi_record_position_known_after_truncate() { multi_record_log.create_queue("queue").await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"1") + .append_record("queue", None, &b"1"[..]) .await .unwrap(), Some(0) @@ -163,7 +195,7 @@ async fn test_multi_record_position_known_after_truncate() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"2") + .append_record("queue", None, &b"2"[..]) .await .unwrap(), Some(1) @@ -179,7 +211,7 @@ async fn test_multi_record_position_known_after_truncate() { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"hello") + .append_record("queue", None, &b"hello"[..]) .await .unwrap(), Some(2) @@ -258,14 +290,14 @@ async fn test_truncate_range_correct_pos() { multi_record_log.create_queue("queue").await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"1") + .append_record("queue", None, &b"1"[..]) .await .unwrap(), Some(0) ); assert_eq!( multi_record_log - .append_record("queue", None, b"2") + .append_record("queue", None, &b"2"[..]) .await .unwrap(), Some(1) @@ -273,7 +305,7 @@ async fn test_truncate_range_correct_pos() { multi_record_log.truncate("queue", 1).await.unwrap(); assert_eq!( multi_record_log - .append_record("queue", None, b"3") + .append_record("queue", None, &b"3"[..]) .await .unwrap(), Some(2) @@ -315,7 +347,7 @@ async fn test_multi_record_size() { let size_mem_create = multi_record_log.in_memory_size(); assert!(size_mem_create > 0); multi_record_log - .append_record("queue", None, b"hello") + .append_record("queue", None, &b"hello"[..]) .await .unwrap(); let size_mem_append = multi_record_log.in_memory_size();