diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index c962fa380f..d1e40f2566 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -349,9 +349,8 @@ where let mut log_batch = LogWriteBatch::new(table_data.wal_region_id()); // Now we only have one request, so no need to use with_capacity - log_batch.push(LogWriteEntry { - payload: WritePayload::Write(&write_req_pb), - }); + let payload = WritePayload::Write(&write_req_pb); + log_batch.push(LogWriteEntry { payload: &payload }); // Write to wal manager let write_ctx = WriteContext::default(); diff --git a/analytic_engine/src/meta/details.rs b/analytic_engine/src/meta/details.rs index 7492d66683..04fac30fca 100644 --- a/analytic_engine/src/meta/details.rs +++ b/analytic_engine/src/meta/details.rs @@ -171,9 +171,8 @@ impl ManifestImpl { let region_id = Self::region_id_of_meta_update(&update); let mut log_batch = LogWriteBatch::new(region_id); - log_batch.push(LogWriteEntry { - payload: MetaUpdatePayload::from(MetaUpdateLogEntry::Normal(update)), - }); + let payload: MetaUpdatePayload = MetaUpdateLogEntry::Normal(update).into(); + log_batch.push(LogWriteEntry { payload: &payload }); let write_ctx = WriteContext::default(); @@ -311,10 +310,15 @@ impl MetaUpdateLogStore for RegionWal { async fn store(&self, log_entries: &[MetaUpdateLogEntry]) -> Result<()> { let mut log_batch = LogWriteBatch::new(self.region_id); + let mut payload_batch = Vec::with_capacity(log_entries.len()); + for entry in log_entries { - log_batch.push(LogWriteEntry { - payload: MetaUpdatePayload::from(entry), - }); + let payload = MetaUpdatePayload::from(entry); + payload_batch.push(payload); + } + + for payload in payload_batch.iter() { + log_batch.push(LogWriteEntry { payload }); } let write_ctx = WriteContext::default(); @@ -855,7 +859,6 @@ mod tests { manifest_data_builder: &mut TableManifestDataBuilder, ) { let manifest = self.open_manifest().await; - self.add_table_with_manifest(table_id, manifest_data_builder, &manifest) .await; } diff --git a/analytic_engine/src/meta/meta_update.rs b/analytic_engine/src/meta/meta_update.rs index 923dfbdde7..07f8fd6bbe 100644 --- a/analytic_engine/src/meta/meta_update.rs +++ b/analytic_engine/src/meta/meta_update.rs @@ -434,15 +434,21 @@ impl From<&MetaUpdateLogEntry> for MetaUpdatePayload { } impl Payload for MetaUpdatePayload { - type Error = Error; - fn encode_size(&self) -> usize { self.0.compute_size().try_into().unwrap_or(0) } - fn encode_to(&self, buf: &mut B) -> Result<()> { + fn encode_to( + &self, + buf: &mut dyn MemBufMut, + ) -> std::result::Result<(), Box> { let mut writer = Writer::new(buf); - self.0.write_to_writer(&mut writer).context(EncodePayloadPb) + self.0 + .write_to_writer(&mut writer) + .context(EncodePayloadPb) + .map_err(Box::new)?; + + Ok(()) } } diff --git a/analytic_engine/src/payload.rs b/analytic_engine/src/payload.rs index 02cf58fe0a..8e60fb0372 100644 --- a/analytic_engine/src/payload.rs +++ b/analytic_engine/src/payload.rs @@ -75,7 +75,7 @@ impl Header { } } -fn write_header(header: Header, buf: &mut B) -> Result<()> { +fn write_header(header: Header, buf: &mut dyn MemBufMut) -> Result<()> { buf.write_u8(header.to_u8()).context(EncodeHeader)?; Ok(()) } @@ -90,8 +90,6 @@ pub enum WritePayload<'a> { } impl<'a> Payload for WritePayload<'a> { - type Error = Error; - fn encode_size(&self) -> usize { let body_size = match self { WritePayload::Write(req) => req.compute_size(), @@ -100,12 +98,17 @@ impl<'a> Payload for WritePayload<'a> { HEADER_SIZE + body_size as usize } - fn encode_to(&self, buf: &mut B) -> Result<()> { + fn encode_to( + &self, + buf: &mut dyn MemBufMut, + ) -> std::result::Result<(), Box> { match self { WritePayload::Write(req) => { write_header(Header::Write, buf)?; let mut writer = Writer::new(buf); - req.write_to_writer(&mut writer).context(EncodeBody)?; + req.write_to_writer(&mut writer) + .context(EncodeBody) + .map_err(Box::new)?; } } diff --git a/components/bytes/src/lib.rs b/components/bytes/src/lib.rs index 015aabce0c..6b69817a32 100644 --- a/components/bytes/src/lib.rs +++ b/components/bytes/src/lib.rs @@ -220,18 +220,18 @@ impl MemBufMut for Vec { /// A `MemBufMut` adapter which implements [std::io::Write] for the inner value #[derive(Debug)] -pub struct Writer<'a, B> { - buf: &'a mut B, +pub struct Writer<'a> { + buf: &'a mut dyn MemBufMut, } -impl<'a, B: MemBufMut> Writer<'a, B> { +impl<'a> Writer<'a> { /// Create a new Writer from a mut ref to buf - pub fn new(buf: &'a mut B) -> Self { + pub fn new(buf: &'a mut dyn MemBufMut) -> Self { Self { buf } } } -impl<'a, B: MemBufMut> Write for Writer<'a, B> { +impl<'a> Write for Writer<'a> { fn write(&mut self, src: &[u8]) -> io::Result { self.buf.write_slice(src).map_err(|e| match &e { Error::UnexpectedEof { .. } => io::Error::new(io::ErrorKind::UnexpectedEof, e), diff --git a/wal/src/kv_encoder.rs b/wal/src/kv_encoder.rs index 84ad2833cf..c317db628f 100644 --- a/wal/src/kv_encoder.rs +++ b/wal/src/kv_encoder.rs @@ -214,23 +214,20 @@ impl LogValueEncoder { } } -impl Encoder for LogValueEncoder { - type Error = Error; - +impl LogValueEncoder { /// Value format: /// +--------------------+---------+ /// | version_header(u8) | payload | /// +--------------------+---------+ - fn encode(&self, buf: &mut B, payload: &T) -> Result<()> { + pub fn encode(&self, buf: &mut B, payload: &dyn Payload) -> Result<()> { buf.write_u8(self.version).context(EncodeLogValueHeader)?; payload - .encode_to(buf) - .map_err(|e| Box::new(e) as _) + .encode_to(buf as &mut dyn MemBufMut) .context(EncodeLogValuePayload) } - fn estimate_encoded_size(&self, payload: &T) -> usize { + pub fn estimate_encoded_size(&self, payload: &dyn Payload) -> usize { // Refer to value format. 1 + payload.encode_size() } diff --git a/wal/src/log_batch.rs b/wal/src/log_batch.rs index 7e08c6c10d..b1c234111c 100644 --- a/wal/src/log_batch.rs +++ b/wal/src/log_batch.rs @@ -12,11 +12,13 @@ use common_types::{ use crate::manager::RegionId; pub trait Payload: Send + Sync + Debug { - type Error: std::error::Error + Send + Sync + 'static; /// Compute size of the encoded payload. fn encode_size(&self) -> usize; /// Append the encoded payload to the `buf`. - fn encode_to(&self, buf: &mut B) -> Result<(), Self::Error>; + fn encode_to( + &self, + buf: &mut dyn MemBufMut, + ) -> Result<(), Box>; } #[derive(Debug)] @@ -31,18 +33,18 @@ pub struct LogEntry

{ /// `PayloadEncoder`. `region_id` is a logically region and set it as 0 if /// unnecessary. #[derive(Debug)] -pub struct LogWriteEntry

{ - pub payload: P, +pub struct LogWriteEntry<'a> { + pub payload: &'a dyn Payload, } /// A batch of `LogWriteEntry`s. #[derive(Debug)] -pub struct LogWriteBatch

{ +pub struct LogWriteBatch<'a> { pub(crate) region_id: RegionId, - pub(crate) entries: Vec>, + pub(crate) entries: Vec>, } -impl LogWriteBatch

{ +impl<'a> LogWriteBatch<'a> { pub fn new(region_id: RegionId) -> Self { Self::with_capacity(region_id, 0) } @@ -55,7 +57,7 @@ impl LogWriteBatch

{ } #[inline] - pub fn push(&mut self, entry: LogWriteEntry

) { + pub fn push(&mut self, entry: LogWriteEntry<'a>) { self.entries.push(entry) } @@ -75,7 +77,7 @@ impl LogWriteBatch

{ } } -impl Default for LogWriteBatch

{ +impl Default for LogWriteBatch<'_> { fn default() -> Self { Self::new(0) } diff --git a/wal/src/manager.rs b/wal/src/manager.rs index 63deb276e2..8943a03888 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -10,7 +10,7 @@ use common_util::runtime::Runtime; use snafu::ResultExt; use crate::{ - log_batch::{LogEntry, LogWriteBatch, Payload, PayloadDecoder}, + log_batch::{LogEntry, LogWriteBatch, PayloadDecoder}, manager, }; @@ -129,19 +129,6 @@ impl Default for WriteContext { } } -/// Write abstraction for log entries in Wal. -#[async_trait] -pub trait LogWriter { - /// Write a batch of log entries to log. - /// - /// Returns the max sequence number for the batch of log entries. - async fn write( - &self, - ctx: &WriteContext, - batch: &LogWriteBatch

, - ) -> Result; -} - #[derive(Debug, Clone)] pub struct ReadContext { /// Timeout to read log entries and it only takes effect when reading from a @@ -246,7 +233,7 @@ pub trait BatchLogIterator { /// Every region has its own increasing (and maybe hallow) sequence number /// space. #[async_trait] -pub trait WalManager: LogWriter + fmt::Debug { +pub trait WalManager: fmt::Debug { /// Get current sequence number. async fn sequence_num(&self, region_id: RegionId) -> Result; @@ -267,6 +254,11 @@ pub trait WalManager: LogWriter + fmt::Debug { ctx: &ReadContext, req: &ReadRequest, ) -> Result; + + /// Write a batch of log entries to log. + /// + /// Returns the max sequence number for the batch of log entries. + async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result; } /// Adapter to convert a blocking interator to a batch async iterator. diff --git a/wal/src/rocks_impl/encoding.rs b/wal/src/rocks_impl/encoding.rs index 7f1706602a..ef1a4a1c75 100644 --- a/wal/src/rocks_impl/encoding.rs +++ b/wal/src/rocks_impl/encoding.rs @@ -51,7 +51,7 @@ impl LogEncoding { Ok(()) } - pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> manager::Result<()> { + pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> manager::Result<()> { buf.clear(); buf.reserve(self.value_enc.estimate_encoded_size(payload)); self.value_enc diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index f40ae6ceca..276b894007 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -23,10 +23,10 @@ use tokio::sync::Mutex; use crate::{ kv_encoder::{LogKey, MaxSeqMetaEncoding, MaxSeqMetaValue, MetaKey}, - log_batch::{LogEntry, LogWriteBatch, Payload}, + log_batch::{LogEntry, LogWriteBatch}, manager::{ - error::*, BatchLogIteratorAdapter, BlockingLogIterator, LogWriter, ReadContext, - ReadRequest, RegionId, WalManager, WriteContext, MAX_REGION_ID, + error::*, BatchLogIteratorAdapter, BlockingLogIterator, ReadContext, ReadRequest, RegionId, + WalManager, WriteContext, MAX_REGION_ID, }, rocks_impl::encoding::LogEncoding, }; @@ -160,7 +160,7 @@ impl Region { Ok(log_iter) } - async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch

) -> Result { + async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result { debug!( "Wal region begin writing, ctx:{:?}, log_entries_num:{}", ctx, @@ -178,7 +178,7 @@ impl Region { self.log_encoding .encode_key(&mut key_buf, &(batch.region_id, next_sequence_num))?; self.log_encoding - .encode_value(&mut value_buf, &entry.payload)?; + .encode_value(&mut value_buf, entry.payload)?; wb.put(&key_buf, &value_buf) .map_err(|e| e.into()) .context(Write)?; @@ -574,18 +574,6 @@ impl BlockingLogIterator for RocksLogIterator { } } -#[async_trait] -impl LogWriter for RocksImpl { - async fn write( - &self, - ctx: &WriteContext, - batch: &LogWriteBatch

, - ) -> Result { - let region = self.get_or_create_region(batch.region_id); - region.write(ctx, batch).await - } -} - #[async_trait] impl WalManager for RocksImpl { async fn sequence_num(&self, region_id: RegionId) -> Result { @@ -614,7 +602,6 @@ impl WalManager for RocksImpl { Ok(()) } - /// Provide iterator on necessary entries according to `ReadRequest`. async fn read_batch( &self, ctx: &ReadContext, @@ -634,6 +621,11 @@ impl WalManager for RocksImpl { ctx.batch_size, )) } + + async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result { + let region = self.get_or_create_region(batch.region_id); + region.write(ctx, batch).await + } } impl fmt::Debug for RocksImpl { diff --git a/wal/src/table_kv_impl/encoding.rs b/wal/src/table_kv_impl/encoding.rs index b160e8b665..97c76fd475 100644 --- a/wal/src/table_kv_impl/encoding.rs +++ b/wal/src/table_kv_impl/encoding.rs @@ -125,7 +125,7 @@ impl LogEncoding { Ok(()) } - pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> Result<()> { + pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> Result<()> { buf.clear(); buf.reserve(self.value_enc.estimate_encoded_size(payload)); self.value_enc.encode(buf, payload) diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index 59a2d716e1..d7023710e4 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -16,7 +16,7 @@ use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_kv::{ScanIter, TableError, TableKv, WriteBatch, WriteContext}; use crate::{ - log_batch::{LogWriteBatch, Payload}, + log_batch::LogWriteBatch, manager::{self, ReadContext, ReadRequest, RegionId, SequenceNumber}, table_kv_impl::{ consts, encoding, @@ -545,10 +545,10 @@ impl NamespaceInner { } /// Write log to this namespace. - async fn write_log( + async fn write_log( &self, ctx: &manager::WriteContext, - batch: &LogWriteBatch

, + batch: &LogWriteBatch<'_>, ) -> Result { let region_id = batch.region_id; let now = Timestamp::now(); @@ -1005,10 +1005,10 @@ impl Namespace { // Async operations. impl Namespace { /// Write log to this namespace. - pub async fn write_log( + pub async fn write_log( &self, ctx: &manager::WriteContext, - batch: &LogWriteBatch

, + batch: &LogWriteBatch<'_>, ) -> Result { self.inner.write_log(ctx, batch).await } @@ -1697,7 +1697,7 @@ mod tests { for val in start_sequence..end_sequence { let mut wb = LogWriteBatch::new(region_id); let payload = TestPayload { val }; - wb.push(LogWriteEntry { payload }); + wb.push(LogWriteEntry { payload: &payload }); last_sequence = namespace.write_log(&write_ctx, &wb).await.unwrap(); } diff --git a/wal/src/table_kv_impl/region.rs b/wal/src/table_kv_impl/region.rs index c05869aac2..f976d02c1a 100644 --- a/wal/src/table_kv_impl/region.rs +++ b/wal/src/table_kv_impl/region.rs @@ -24,7 +24,7 @@ use tokio::sync::Mutex; use crate::{ kv_encoder::LogKey, - log_batch::{LogEntry, LogWriteBatch, Payload}, + log_batch::{LogEntry, LogWriteBatch}, manager::{self, BlockingLogIterator, ReadContext, ReadRequest, RegionId, SequenceNumber}, table_kv_impl::{ encoding, encoding::LogEncoding, model::RegionEntry, namespace::BucketRef, WalRuntimes, @@ -255,12 +255,12 @@ impl Region { .context(RuntimeExec)? } - pub async fn write_log( + pub async fn write_log( &self, table_kv: &T, bucket: &BucketRef, ctx: &manager::WriteContext, - log_batch: &LogWriteBatch

, + log_batch: &LogWriteBatch<'_>, ) -> Result { let mut writer = self.writer.lock().await; writer @@ -789,14 +789,14 @@ impl RegionWriter { } impl RegionWriter { - async fn write_log( + async fn write_log( &mut self, runtime: &Runtime, table_kv: &T, region_state: &RegionState, bucket: &BucketRef, ctx: &manager::WriteContext, - log_batch: &LogWriteBatch

, + log_batch: &LogWriteBatch<'_>, ) -> Result { debug!( "Wal region begin writing, ctx:{:?}, region_id:{}, log_entries_num:{}", @@ -818,7 +818,7 @@ impl RegionWriter { .encode_key(&mut key_buf, &(log_batch.region_id, next_sequence_num)) .context(LogCodec)?; log_encoding - .encode_value(&mut value_buf, &entry.payload) + .encode_value(&mut value_buf, entry.payload) .context(LogCodec)?; wb.insert(&key_buf, &value_buf); diff --git a/wal/src/table_kv_impl/wal.rs b/wal/src/table_kv_impl/wal.rs index aaa3de8527..216b4e65f9 100644 --- a/wal/src/table_kv_impl/wal.rs +++ b/wal/src/table_kv_impl/wal.rs @@ -11,10 +11,9 @@ use snafu::ResultExt; use table_kv::TableKv; use crate::{ - log_batch::{LogWriteBatch, Payload}, + log_batch::LogWriteBatch, manager::{ - self, error::*, BatchLogIteratorAdapter, LogWriter, ReadContext, ReadRequest, RegionId, - WalManager, + self, error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, WalManager, }, table_kv_impl::{ model::NamespaceConfig, @@ -97,21 +96,6 @@ impl fmt::Debug for WalNamespaceImpl { } } -#[async_trait] -impl LogWriter for WalNamespaceImpl { - async fn write( - &self, - ctx: &manager::WriteContext, - batch: &LogWriteBatch

, - ) -> Result { - self.namespace - .write_log(ctx, batch) - .await - .map_err(|e| Box::new(e) as _) - .context(Write) - } -} - #[async_trait] impl WalManager for WalNamespaceImpl { async fn sequence_num(&self, region_id: RegionId) -> Result { @@ -162,4 +146,16 @@ impl WalManager for WalNamespaceImpl { ctx.batch_size, )) } + + async fn write( + &self, + ctx: &manager::WriteContext, + batch: &LogWriteBatch<'_>, + ) -> Result { + self.namespace + .write_log(ctx, batch) + .await + .map_err(|e| Box::new(e) as _) + .context(Write) + } } diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index 084700ed65..604d6358a4 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -6,10 +6,9 @@ use common_types::SequenceNumber; use crate::{ log_batch::LogWriteBatch, - manager::{LogWriter, ReadBoundary, ReadRequest, RegionId, WalManager}, + manager::{ReadBoundary, ReadRequest, RegionId, WalManager}, tests::util::{ - MemoryTableWalBuilder, RocksTestEnv, RocksWalBuilder, TableKvTestEnv, TestEnv, TestPayload, - WalBuilder, + MemoryTableWalBuilder, RocksTestEnv, RocksWalBuilder, TableKvTestEnv, TestEnv, WalBuilder, }, }; @@ -18,7 +17,7 @@ async fn check_write_batch_with_read_request( wal: Arc, read_req: ReadRequest, max_seq: SequenceNumber, - write_batch: &LogWriteBatch, + write_batch: &LogWriteBatch<'_>, ) { let iter = wal .read_batch(&env.read_ctx, &read_req) @@ -32,7 +31,7 @@ async fn check_write_batch( wal: Arc, region_id: RegionId, max_seq: SequenceNumber, - write_batch: &LogWriteBatch, + write_batch: &LogWriteBatch<'_>, ) { let read_req = ReadRequest { region_id, @@ -47,7 +46,8 @@ async fn simple_read_write_with_wal( wal: Arc, region_id: RegionId, ) { - let write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let seq = wal .write(&env.write_ctx, &write_batch) .await @@ -74,7 +74,8 @@ async fn simple_read_write(env: &TestEnv, region_id: RegionId) async fn read_with_boundary(env: &TestEnv) { let wal = env.build_wal().await; let region_id = 0; - let write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let end_seq = wal .write(&env.write_ctx, &write_batch) .await @@ -114,7 +115,8 @@ async fn read_with_boundary(env: &TestEnv) { start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Included(end_seq), }; - let write_batch = env.build_log_batch(region_id, 1, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 1, 10, &mut payload_batch); check_write_batch_with_read_request(env, wal.clone(), read_req, end_seq, &write_batch) .await; } @@ -126,7 +128,8 @@ async fn read_with_boundary(env: &TestEnv) { start: ReadBoundary::Included(start_seq), end: ReadBoundary::Excluded(end_seq), }; - let write_batch = env.build_log_batch(region_id, 0, 9); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 9, &mut payload_batch); check_write_batch_with_read_request(env, wal.clone(), read_req, end_seq - 1, &write_batch) .await; } @@ -138,7 +141,8 @@ async fn read_with_boundary(env: &TestEnv) { start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Excluded(end_seq), }; - let write_batch = env.build_log_batch(region_id, 1, 9); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 1, 9, &mut payload_batch); check_write_batch_with_read_request(env, wal.clone(), read_req, end_seq - 1, &write_batch) .await; } @@ -173,9 +177,10 @@ async fn write_multiple_regions_parallelly(env: Arc(env: &TestEnv) { let region_id = 0; + let mut payload_batch = Vec::default(); let (write_batch, seq) = { let wal = env.build_wal().await; - let write_batch = env.build_log_batch(region_id, 0, 10); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let seq = wal .write(&env.write_ctx, &write_batch) .await @@ -216,13 +221,14 @@ async fn complex_read_write(env: &TestEnv) { let region_id = 0; // write two batches + let (mut payload_batch1, mut payload_batch2) = (Vec::default(), Vec::default()); let (start_val, mid_val, end_val) = (0, 10, 50); - let write_batch_1 = env.build_log_batch(region_id, start_val, mid_val); + let write_batch_1 = env.build_log_batch(region_id, start_val, mid_val, &mut payload_batch1); let seq_1 = wal .write(&env.write_ctx, &write_batch_1) .await .expect("should succeed to write"); - let write_batch_2 = env.build_log_batch(region_id, mid_val, end_val); + let write_batch_2 = env.build_log_batch(region_id, mid_val, end_val, &mut payload_batch2); let seq_2 = wal .write(&env.write_ctx, &write_batch_2) .await @@ -234,15 +240,23 @@ async fn complex_read_write(env: &TestEnv) { check_write_batch(env, wal.clone(), region_id, seq_2, &write_batch_2).await; // read the whole batch - let (seq_3, write_batch_3) = (seq_2, env.build_log_batch(region_id, start_val, end_val)); + let mut payload_batch3 = Vec::default(); + let (seq_3, write_batch_3) = ( + seq_2, + env.build_log_batch(region_id, start_val, end_val, &mut payload_batch3), + ); check_write_batch(env, wal.clone(), region_id, seq_3, &write_batch_3).await; // read the part of batch1 and batch2 + let mut payload_batch4 = Vec::default(); let (seq_4, write_batch_4) = { let new_start = (start_val + mid_val) / 2; let new_end = (mid_val + end_val) / 2; let seq = seq_2 - (end_val - new_end) as u64; - (seq, env.build_log_batch(region_id, new_start, new_end)) + ( + seq, + env.build_log_batch(region_id, new_start, new_end, &mut payload_batch4), + ) }; check_write_batch(env, wal.clone(), region_id, seq_4, &write_batch_4).await; @@ -253,7 +267,8 @@ async fn complex_read_write(env: &TestEnv) { async fn simple_write_delete(env: &TestEnv) { let region_id = 0; let wal = env.build_wal().await; - let mut write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let mut write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let seq = wal .write(&env.write_ctx, &write_batch) .await @@ -290,7 +305,8 @@ async fn simple_write_delete(env: &TestEnv) { async fn write_delete_half(env: &TestEnv) { let region_id = 0; let wal = env.build_wal().await; - let mut write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let mut write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let seq = wal .write(&env.write_ctx, &write_batch) .await @@ -324,13 +340,15 @@ async fn write_delete_half(env: &TestEnv) { async fn write_delete_multiple_regions(env: &TestEnv) { let (region_id_1, region_id_2) = (1, 2); let wal = env.build_wal().await; - let mut write_batch_1 = env.build_log_batch(region_id_1, 0, 10); + let mut payload_batch1 = Vec::default(); + let mut write_batch_1 = env.build_log_batch(region_id_1, 0, 10, &mut payload_batch1); let seq_1 = wal .write(&env.write_ctx, &write_batch_1) .await .expect("should succeed to write"); - let write_batch_2 = env.build_log_batch(region_id_2, 10, 20); + let mut payload_batch2 = Vec::default(); + let write_batch_2 = env.build_log_batch(region_id_2, 10, 20, &mut payload_batch2); let seq_2 = wal .write(&env.write_ctx, &write_batch_2) .await @@ -361,7 +379,8 @@ async fn write_delete_multiple_regions(env: &TestEnv) { async fn sequence_increase_monotonically_multiple_writes(env: &TestEnv) { let region_id = 0; let wal = env.build_wal().await; - let write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); let seq_1 = wal .write(&env.write_ctx, &write_batch) .await @@ -386,7 +405,8 @@ async fn sequence_increase_monotonically_multiple_writes(env: &Te async fn sequence_increase_monotonically_delete_write(env: &TestEnv) { let region_id = 0; let wal = env.build_wal().await; - let write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); // write let seq_1 = wal .write(&env.write_ctx, &write_batch) @@ -415,7 +435,8 @@ async fn sequence_increase_monotonically_delete_write(env: &TestE async fn sequence_increase_monotonically_delete_reopen_write(env: &TestEnv) { let region_id = 0; let wal = env.build_wal().await; - let write_batch = env.build_log_batch(region_id, 0, 10); + let mut payload_batch = Vec::default(); + let write_batch = env.build_log_batch(region_id, 0, 10, &mut payload_batch); // write let seq_1 = wal .write(&env.write_ctx, &write_batch) diff --git a/wal/src/tests/util.rs b/wal/src/tests/util.rs index e5926712b7..1f7d4d222e 100644 --- a/wal/src/tests/util.rs +++ b/wal/src/tests/util.rs @@ -136,15 +136,21 @@ impl TestEnv { } /// Build the log batch with [TestPayload].val range [start, end). - pub fn build_log_batch( + pub fn build_log_batch<'a>( &self, region_id: RegionId, start: u32, end: u32, - ) -> LogWriteBatch { + payload_batch: &'a mut Vec, + ) -> LogWriteBatch<'a> { let mut write_batch = LogWriteBatch::new(region_id); + for val in start..end { let payload = TestPayload { val }; + payload_batch.push(payload); + } + + for payload in payload_batch.iter() { write_batch.entries.push(LogWriteEntry { payload }); } @@ -156,7 +162,7 @@ impl TestEnv { pub async fn check_log_entries( &self, max_seq: SequenceNumber, - write_batch: &LogWriteBatch, + write_batch: &LogWriteBatch<'_>, mut iter: BatchLogIteratorAdapter, ) { let mut log_entries = VecDeque::with_capacity(write_batch.entries.len()); @@ -182,8 +188,22 @@ impl TestEnv { .rev() .enumerate() { + // sequence assert_eq!(max_seq - idx as u64, log_entry.sequence); - assert_eq!(expect_log_write_entry.payload, log_entry.payload); + + // payload + let (mut expected_buf, mut buf) = (Vec::new(), Vec::new()); + expect_log_write_entry + .payload + .encode_to(&mut expected_buf) + .unwrap(); + log_entry.payload.encode_to(&mut buf).unwrap(); + + assert_eq!( + expect_log_write_entry.payload.encode_size(), + log_entry.payload.encode_size() + ); + assert_eq!(expected_buf, buf); } } } @@ -195,13 +215,14 @@ pub struct TestPayload { } impl Payload for TestPayload { - type Error = Error; - fn encode_size(&self) -> usize { 4 } - fn encode_to(&self, buf: &mut B) -> Result<(), Self::Error> { + fn encode_to( + &self, + buf: &mut dyn MemBufMut, + ) -> Result<(), Box> { buf.write_u32(self.val).expect("must write"); Ok(()) }