Skip to content

Commit

Permalink
refactor: make write method object safe in WalManager (#159)
Browse files Browse the repository at this point in the history
* refactor: make write method object safe in WalManager

* fix: clean unused code

* fix: remove unused trait impl

* fix: remove generic param in Writer
  • Loading branch information
ygf11 authored Jul 30, 2022
1 parent b5f9114 commit 5bfb577
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 134 deletions.
5 changes: 2 additions & 3 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,8 @@ where

let mut log_batch = LogWriteBatch::new(table_data.wal_region_id());
// Now we only have one request, so no need to use with_capacity
log_batch.push(LogWriteEntry {
payload: WritePayload::Write(&write_req_pb),
});
let payload = WritePayload::Write(&write_req_pb);
log_batch.push(LogWriteEntry { payload: &payload });

// Write to wal manager
let write_ctx = WriteContext::default();
Expand Down
17 changes: 10 additions & 7 deletions analytic_engine/src/meta/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ impl<W: WalManager + Send + Sync> ManifestImpl<W> {

let region_id = Self::region_id_of_meta_update(&update);
let mut log_batch = LogWriteBatch::new(region_id);
log_batch.push(LogWriteEntry {
payload: MetaUpdatePayload::from(MetaUpdateLogEntry::Normal(update)),
});
let payload: MetaUpdatePayload = MetaUpdateLogEntry::Normal(update).into();
log_batch.push(LogWriteEntry { payload: &payload });

let write_ctx = WriteContext::default();

Expand Down Expand Up @@ -311,10 +310,15 @@ impl<W: WalManager + Send + Sync> MetaUpdateLogStore for RegionWal<W> {

async fn store(&self, log_entries: &[MetaUpdateLogEntry]) -> Result<()> {
let mut log_batch = LogWriteBatch::new(self.region_id);
let mut payload_batch = Vec::with_capacity(log_entries.len());

for entry in log_entries {
log_batch.push(LogWriteEntry {
payload: MetaUpdatePayload::from(entry),
});
let payload = MetaUpdatePayload::from(entry);
payload_batch.push(payload);
}

for payload in payload_batch.iter() {
log_batch.push(LogWriteEntry { payload });
}

let write_ctx = WriteContext::default();
Expand Down Expand Up @@ -855,7 +859,6 @@ mod tests {
manifest_data_builder: &mut TableManifestDataBuilder,
) {
let manifest = self.open_manifest().await;

self.add_table_with_manifest(table_id, manifest_data_builder, &manifest)
.await;
}
Expand Down
14 changes: 10 additions & 4 deletions analytic_engine/src/meta/meta_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,21 @@ impl From<&MetaUpdateLogEntry> for MetaUpdatePayload {
}

impl Payload for MetaUpdatePayload {
type Error = Error;

fn encode_size(&self) -> usize {
self.0.compute_size().try_into().unwrap_or(0)
}

fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<()> {
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut writer = Writer::new(buf);
self.0.write_to_writer(&mut writer).context(EncodePayloadPb)
self.0
.write_to_writer(&mut writer)
.context(EncodePayloadPb)
.map_err(Box::new)?;

Ok(())
}
}

Expand Down
13 changes: 8 additions & 5 deletions analytic_engine/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Header {
}
}

fn write_header<B: MemBufMut>(header: Header, buf: &mut B) -> Result<()> {
fn write_header(header: Header, buf: &mut dyn MemBufMut) -> Result<()> {
buf.write_u8(header.to_u8()).context(EncodeHeader)?;
Ok(())
}
Expand All @@ -90,8 +90,6 @@ pub enum WritePayload<'a> {
}

impl<'a> Payload for WritePayload<'a> {
type Error = Error;

fn encode_size(&self) -> usize {
let body_size = match self {
WritePayload::Write(req) => req.compute_size(),
Expand All @@ -100,12 +98,17 @@ impl<'a> Payload for WritePayload<'a> {
HEADER_SIZE + body_size as usize
}

fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<()> {
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
match self {
WritePayload::Write(req) => {
write_header(Header::Write, buf)?;
let mut writer = Writer::new(buf);
req.write_to_writer(&mut writer).context(EncodeBody)?;
req.write_to_writer(&mut writer)
.context(EncodeBody)
.map_err(Box::new)?;
}
}

Expand Down
10 changes: 5 additions & 5 deletions components/bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ impl MemBufMut for Vec<u8> {

/// A `MemBufMut` adapter which implements [std::io::Write] for the inner value
#[derive(Debug)]
pub struct Writer<'a, B> {
buf: &'a mut B,
pub struct Writer<'a> {
buf: &'a mut dyn MemBufMut,
}

impl<'a, B: MemBufMut> Writer<'a, B> {
impl<'a> Writer<'a> {
/// Create a new Writer from a mut ref to buf
pub fn new(buf: &'a mut B) -> Self {
pub fn new(buf: &'a mut dyn MemBufMut) -> Self {
Self { buf }
}
}

impl<'a, B: MemBufMut> Write for Writer<'a, B> {
impl<'a> Write for Writer<'a> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.buf.write_slice(src).map_err(|e| match &e {
Error::UnexpectedEof { .. } => io::Error::new(io::ErrorKind::UnexpectedEof, e),
Expand Down
11 changes: 4 additions & 7 deletions wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,20 @@ impl LogValueEncoder {
}
}

impl<T: Payload> Encoder<T> for LogValueEncoder {
type Error = Error;

impl LogValueEncoder {
/// Value format:
/// +--------------------+---------+
/// | version_header(u8) | payload |
/// +--------------------+---------+
fn encode<B: MemBufMut>(&self, buf: &mut B, payload: &T) -> Result<()> {
pub fn encode<B: MemBufMut>(&self, buf: &mut B, payload: &dyn Payload) -> Result<()> {
buf.write_u8(self.version).context(EncodeLogValueHeader)?;

payload
.encode_to(buf)
.map_err(|e| Box::new(e) as _)
.encode_to(buf as &mut dyn MemBufMut)
.context(EncodeLogValuePayload)
}

fn estimate_encoded_size(&self, payload: &T) -> usize {
pub fn estimate_encoded_size(&self, payload: &dyn Payload) -> usize {
// Refer to value format.
1 + payload.encode_size()
}
Expand Down
20 changes: 11 additions & 9 deletions wal/src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use common_types::{
use crate::manager::RegionId;

pub trait Payload: Send + Sync + Debug {
type Error: std::error::Error + Send + Sync + 'static;
/// Compute size of the encoded payload.
fn encode_size(&self) -> usize;
/// Append the encoded payload to the `buf`.
fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<(), Self::Error>;
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}

#[derive(Debug)]
Expand All @@ -31,18 +33,18 @@ pub struct LogEntry<P> {
/// `PayloadEncoder`. `region_id` is a logically region and set it as 0 if
/// unnecessary.
#[derive(Debug)]
pub struct LogWriteEntry<P> {
pub payload: P,
pub struct LogWriteEntry<'a> {
pub payload: &'a dyn Payload,
}

/// A batch of `LogWriteEntry`s.
#[derive(Debug)]
pub struct LogWriteBatch<P> {
pub struct LogWriteBatch<'a> {
pub(crate) region_id: RegionId,
pub(crate) entries: Vec<LogWriteEntry<P>>,
pub(crate) entries: Vec<LogWriteEntry<'a>>,
}

impl<P: Payload> LogWriteBatch<P> {
impl<'a> LogWriteBatch<'a> {
pub fn new(region_id: RegionId) -> Self {
Self::with_capacity(region_id, 0)
}
Expand All @@ -55,7 +57,7 @@ impl<P: Payload> LogWriteBatch<P> {
}

#[inline]
pub fn push(&mut self, entry: LogWriteEntry<P>) {
pub fn push(&mut self, entry: LogWriteEntry<'a>) {
self.entries.push(entry)
}

Expand All @@ -75,7 +77,7 @@ impl<P: Payload> LogWriteBatch<P> {
}
}

impl<P: Payload> Default for LogWriteBatch<P> {
impl Default for LogWriteBatch<'_> {
fn default() -> Self {
Self::new(0)
}
Expand Down
22 changes: 7 additions & 15 deletions wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use common_util::runtime::Runtime;
use snafu::ResultExt;

use crate::{
log_batch::{LogEntry, LogWriteBatch, Payload, PayloadDecoder},
log_batch::{LogEntry, LogWriteBatch, PayloadDecoder},
manager,
};

Expand Down Expand Up @@ -129,19 +129,6 @@ impl Default for WriteContext {
}
}

/// Write abstraction for log entries in Wal.
#[async_trait]
pub trait LogWriter {
/// Write a batch of log entries to log.
///
/// Returns the max sequence number for the batch of log entries.
async fn write<P: Payload>(
&self,
ctx: &WriteContext,
batch: &LogWriteBatch<P>,
) -> Result<SequenceNumber>;
}

#[derive(Debug, Clone)]
pub struct ReadContext {
/// Timeout to read log entries and it only takes effect when reading from a
Expand Down Expand Up @@ -246,7 +233,7 @@ pub trait BatchLogIterator {
/// Every region has its own increasing (and maybe hallow) sequence number
/// space.
#[async_trait]
pub trait WalManager: LogWriter + fmt::Debug {
pub trait WalManager: fmt::Debug {
/// Get current sequence number.
async fn sequence_num(&self, region_id: RegionId) -> Result<SequenceNumber>;

Expand All @@ -267,6 +254,11 @@ pub trait WalManager: LogWriter + fmt::Debug {
ctx: &ReadContext,
req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter>;

/// Write a batch of log entries to log.
///
/// Returns the max sequence number for the batch of log entries.
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<SequenceNumber>;
}

/// Adapter to convert a blocking interator to a batch async iterator.
Expand Down
2 changes: 1 addition & 1 deletion wal/src/rocks_impl/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl LogEncoding {
Ok(())
}

pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> manager::Result<()> {
pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> manager::Result<()> {
buf.clear();
buf.reserve(self.value_enc.estimate_encoded_size(payload));
self.value_enc
Expand Down
28 changes: 10 additions & 18 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use tokio::sync::Mutex;

use crate::{
kv_encoder::{LogKey, MaxSeqMetaEncoding, MaxSeqMetaValue, MetaKey},
log_batch::{LogEntry, LogWriteBatch, Payload},
log_batch::{LogEntry, LogWriteBatch},
manager::{
error::*, BatchLogIteratorAdapter, BlockingLogIterator, LogWriter, ReadContext,
ReadRequest, RegionId, WalManager, WriteContext, MAX_REGION_ID,
error::*, BatchLogIteratorAdapter, BlockingLogIterator, ReadContext, ReadRequest, RegionId,
WalManager, WriteContext, MAX_REGION_ID,
},
rocks_impl::encoding::LogEncoding,
};
Expand Down Expand Up @@ -160,7 +160,7 @@ impl Region {
Ok(log_iter)
}

async fn write<P: Payload>(&self, ctx: &WriteContext, batch: &LogWriteBatch<P>) -> Result<u64> {
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<u64> {
debug!(
"Wal region begin writing, ctx:{:?}, log_entries_num:{}",
ctx,
Expand All @@ -178,7 +178,7 @@ impl Region {
self.log_encoding
.encode_key(&mut key_buf, &(batch.region_id, next_sequence_num))?;
self.log_encoding
.encode_value(&mut value_buf, &entry.payload)?;
.encode_value(&mut value_buf, entry.payload)?;
wb.put(&key_buf, &value_buf)
.map_err(|e| e.into())
.context(Write)?;
Expand Down Expand Up @@ -574,18 +574,6 @@ impl BlockingLogIterator for RocksLogIterator {
}
}

#[async_trait]
impl LogWriter for RocksImpl {
async fn write<P: Payload>(
&self,
ctx: &WriteContext,
batch: &LogWriteBatch<P>,
) -> Result<SequenceNumber> {
let region = self.get_or_create_region(batch.region_id);
region.write(ctx, batch).await
}
}

#[async_trait]
impl WalManager for RocksImpl {
async fn sequence_num(&self, region_id: RegionId) -> Result<u64> {
Expand Down Expand Up @@ -614,7 +602,6 @@ impl WalManager for RocksImpl {
Ok(())
}

/// Provide iterator on necessary entries according to `ReadRequest`.
async fn read_batch(
&self,
ctx: &ReadContext,
Expand All @@ -634,6 +621,11 @@ impl WalManager for RocksImpl {
ctx.batch_size,
))
}

async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<SequenceNumber> {
let region = self.get_or_create_region(batch.region_id);
region.write(ctx, batch).await
}
}

impl fmt::Debug for RocksImpl {
Expand Down
2 changes: 1 addition & 1 deletion wal/src/table_kv_impl/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl LogEncoding {
Ok(())
}

pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> Result<()> {
pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> Result<()> {
buf.clear();
buf.reserve(self.value_enc.estimate_encoded_size(payload));
self.value_enc.encode(buf, payload)
Expand Down
Loading

0 comments on commit 5bfb577

Please sign in to comment.