Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make write method object safe in WalManager #159

Merged
merged 5 commits into from
Jul 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,8 @@ where

let mut log_batch = LogWriteBatch::new(table_data.wal_region_id());
// Now we only have one request, so no need to use with_capacity
log_batch.push(LogWriteEntry {
payload: WritePayload::Write(&write_req_pb),
});
let payload = WritePayload::Write(&write_req_pb);
log_batch.push(LogWriteEntry { payload: &payload });

// Write to wal manager
let write_ctx = WriteContext::default();
Expand Down
17 changes: 10 additions & 7 deletions analytic_engine/src/meta/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ impl<W: WalManager + Send + Sync> ManifestImpl<W> {

let region_id = Self::region_id_of_meta_update(&update);
let mut log_batch = LogWriteBatch::new(region_id);
log_batch.push(LogWriteEntry {
payload: MetaUpdatePayload::from(MetaUpdateLogEntry::Normal(update)),
});
let payload: MetaUpdatePayload = MetaUpdateLogEntry::Normal(update).into();
log_batch.push(LogWriteEntry { payload: &payload });

let write_ctx = WriteContext::default();

Expand Down Expand Up @@ -311,10 +310,15 @@ impl<W: WalManager + Send + Sync> MetaUpdateLogStore for RegionWal<W> {

async fn store(&self, log_entries: &[MetaUpdateLogEntry]) -> Result<()> {
let mut log_batch = LogWriteBatch::new(self.region_id);
let mut payload_batch = Vec::with_capacity(log_entries.len());

for entry in log_entries {
log_batch.push(LogWriteEntry {
payload: MetaUpdatePayload::from(entry),
});
let payload = MetaUpdatePayload::from(entry);
payload_batch.push(payload);
}

for payload in payload_batch.iter() {
log_batch.push(LogWriteEntry { payload });
}

let write_ctx = WriteContext::default();
Expand Down Expand Up @@ -855,7 +859,6 @@ mod tests {
manifest_data_builder: &mut TableManifestDataBuilder,
) {
let manifest = self.open_manifest().await;

self.add_table_with_manifest(table_id, manifest_data_builder, &manifest)
.await;
}
Expand Down
14 changes: 10 additions & 4 deletions analytic_engine/src/meta/meta_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,21 @@ impl From<&MetaUpdateLogEntry> for MetaUpdatePayload {
}

impl Payload for MetaUpdatePayload {
type Error = Error;

fn encode_size(&self) -> usize {
self.0.compute_size().try_into().unwrap_or(0)
}

fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<()> {
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut writer = Writer::new(buf);
self.0.write_to_writer(&mut writer).context(EncodePayloadPb)
self.0
.write_to_writer(&mut writer)
.context(EncodePayloadPb)
.map_err(Box::new)?;

Ok(())
}
}

Expand Down
13 changes: 8 additions & 5 deletions analytic_engine/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Header {
}
}

fn write_header<B: MemBufMut>(header: Header, buf: &mut B) -> Result<()> {
fn write_header(header: Header, buf: &mut dyn MemBufMut) -> Result<()> {
buf.write_u8(header.to_u8()).context(EncodeHeader)?;
Ok(())
}
Expand All @@ -90,8 +90,6 @@ pub enum WritePayload<'a> {
}

impl<'a> Payload for WritePayload<'a> {
type Error = Error;

fn encode_size(&self) -> usize {
let body_size = match self {
WritePayload::Write(req) => req.compute_size(),
Expand All @@ -100,12 +98,17 @@ impl<'a> Payload for WritePayload<'a> {
HEADER_SIZE + body_size as usize
}

fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<()> {
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> {
match self {
WritePayload::Write(req) => {
write_header(Header::Write, buf)?;
let mut writer = Writer::new(buf);
req.write_to_writer(&mut writer).context(EncodeBody)?;
req.write_to_writer(&mut writer)
.context(EncodeBody)
.map_err(Box::new)?;
}
}

Expand Down
10 changes: 5 additions & 5 deletions components/bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ impl MemBufMut for Vec<u8> {

/// A `MemBufMut` adapter which implements [std::io::Write] for the inner value
#[derive(Debug)]
pub struct Writer<'a, B> {
buf: &'a mut B,
pub struct Writer<'a> {
buf: &'a mut dyn MemBufMut,
}

impl<'a, B: MemBufMut> Writer<'a, B> {
impl<'a> Writer<'a> {
/// Create a new Writer from a mut ref to buf
pub fn new(buf: &'a mut B) -> Self {
pub fn new(buf: &'a mut dyn MemBufMut) -> Self {
Self { buf }
}
}

impl<'a, B: MemBufMut> Write for Writer<'a, B> {
impl<'a> Write for Writer<'a> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.buf.write_slice(src).map_err(|e| match &e {
Error::UnexpectedEof { .. } => io::Error::new(io::ErrorKind::UnexpectedEof, e),
Expand Down
11 changes: 4 additions & 7 deletions wal/src/kv_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,20 @@ impl LogValueEncoder {
}
}

impl<T: Payload> Encoder<T> for LogValueEncoder {
type Error = Error;

impl LogValueEncoder {
/// Value format:
/// +--------------------+---------+
/// | version_header(u8) | payload |
/// +--------------------+---------+
fn encode<B: MemBufMut>(&self, buf: &mut B, payload: &T) -> Result<()> {
pub fn encode<B: MemBufMut>(&self, buf: &mut B, payload: &dyn Payload) -> Result<()> {
buf.write_u8(self.version).context(EncodeLogValueHeader)?;

payload
.encode_to(buf)
.map_err(|e| Box::new(e) as _)
.encode_to(buf as &mut dyn MemBufMut)
.context(EncodeLogValuePayload)
}

fn estimate_encoded_size(&self, payload: &T) -> usize {
pub fn estimate_encoded_size(&self, payload: &dyn Payload) -> usize {
// Refer to value format.
1 + payload.encode_size()
}
Expand Down
20 changes: 11 additions & 9 deletions wal/src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use common_types::{
use crate::manager::RegionId;

pub trait Payload: Send + Sync + Debug {
type Error: std::error::Error + Send + Sync + 'static;
/// Compute size of the encoded payload.
fn encode_size(&self) -> usize;
/// Append the encoded payload to the `buf`.
fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<(), Self::Error>;
fn encode_to(
&self,
buf: &mut dyn MemBufMut,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}

#[derive(Debug)]
Expand All @@ -31,18 +33,18 @@ pub struct LogEntry<P> {
/// `PayloadEncoder`. `region_id` is a logically region and set it as 0 if
/// unnecessary.
#[derive(Debug)]
pub struct LogWriteEntry<P> {
pub payload: P,
pub struct LogWriteEntry<'a> {
pub payload: &'a dyn Payload,
}

/// A batch of `LogWriteEntry`s.
#[derive(Debug)]
pub struct LogWriteBatch<P> {
pub struct LogWriteBatch<'a> {
pub(crate) region_id: RegionId,
pub(crate) entries: Vec<LogWriteEntry<P>>,
pub(crate) entries: Vec<LogWriteEntry<'a>>,
}

impl<P: Payload> LogWriteBatch<P> {
impl<'a> LogWriteBatch<'a> {
pub fn new(region_id: RegionId) -> Self {
Self::with_capacity(region_id, 0)
}
Expand All @@ -55,7 +57,7 @@ impl<P: Payload> LogWriteBatch<P> {
}

#[inline]
pub fn push(&mut self, entry: LogWriteEntry<P>) {
pub fn push(&mut self, entry: LogWriteEntry<'a>) {
self.entries.push(entry)
}

Expand All @@ -75,7 +77,7 @@ impl<P: Payload> LogWriteBatch<P> {
}
}

impl<P: Payload> Default for LogWriteBatch<P> {
impl Default for LogWriteBatch<'_> {
fn default() -> Self {
Self::new(0)
}
Expand Down
22 changes: 7 additions & 15 deletions wal/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use common_util::runtime::Runtime;
use snafu::ResultExt;

use crate::{
log_batch::{LogEntry, LogWriteBatch, Payload, PayloadDecoder},
log_batch::{LogEntry, LogWriteBatch, PayloadDecoder},
manager,
};

Expand Down Expand Up @@ -129,19 +129,6 @@ impl Default for WriteContext {
}
}

/// Write abstraction for log entries in Wal.
#[async_trait]
pub trait LogWriter {
/// Write a batch of log entries to log.
///
/// Returns the max sequence number for the batch of log entries.
async fn write<P: Payload>(
&self,
ctx: &WriteContext,
batch: &LogWriteBatch<P>,
) -> Result<SequenceNumber>;
}

#[derive(Debug, Clone)]
pub struct ReadContext {
/// Timeout to read log entries and it only takes effect when reading from a
Expand Down Expand Up @@ -246,7 +233,7 @@ pub trait BatchLogIterator {
/// Every region has its own increasing (and maybe hallow) sequence number
/// space.
#[async_trait]
pub trait WalManager: LogWriter + fmt::Debug {
pub trait WalManager: fmt::Debug {
/// Get current sequence number.
async fn sequence_num(&self, region_id: RegionId) -> Result<SequenceNumber>;

Expand All @@ -267,6 +254,11 @@ pub trait WalManager: LogWriter + fmt::Debug {
ctx: &ReadContext,
req: &ReadRequest,
) -> Result<BatchLogIteratorAdapter>;

/// Write a batch of log entries to log.
///
/// Returns the max sequence number for the batch of log entries.
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<SequenceNumber>;
}

/// Adapter to convert a blocking interator to a batch async iterator.
Expand Down
2 changes: 1 addition & 1 deletion wal/src/rocks_impl/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl LogEncoding {
Ok(())
}

pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> manager::Result<()> {
pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> manager::Result<()> {
buf.clear();
buf.reserve(self.value_enc.estimate_encoded_size(payload));
self.value_enc
Expand Down
28 changes: 10 additions & 18 deletions wal/src/rocks_impl/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use tokio::sync::Mutex;

use crate::{
kv_encoder::{LogKey, MaxSeqMetaEncoding, MaxSeqMetaValue, MetaKey},
log_batch::{LogEntry, LogWriteBatch, Payload},
log_batch::{LogEntry, LogWriteBatch},
manager::{
error::*, BatchLogIteratorAdapter, BlockingLogIterator, LogWriter, ReadContext,
ReadRequest, RegionId, WalManager, WriteContext, MAX_REGION_ID,
error::*, BatchLogIteratorAdapter, BlockingLogIterator, ReadContext, ReadRequest, RegionId,
WalManager, WriteContext, MAX_REGION_ID,
},
rocks_impl::encoding::LogEncoding,
};
Expand Down Expand Up @@ -160,7 +160,7 @@ impl Region {
Ok(log_iter)
}

async fn write<P: Payload>(&self, ctx: &WriteContext, batch: &LogWriteBatch<P>) -> Result<u64> {
async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<u64> {
debug!(
"Wal region begin writing, ctx:{:?}, log_entries_num:{}",
ctx,
Expand All @@ -178,7 +178,7 @@ impl Region {
self.log_encoding
.encode_key(&mut key_buf, &(batch.region_id, next_sequence_num))?;
self.log_encoding
.encode_value(&mut value_buf, &entry.payload)?;
.encode_value(&mut value_buf, entry.payload)?;
wb.put(&key_buf, &value_buf)
.map_err(|e| e.into())
.context(Write)?;
Expand Down Expand Up @@ -574,18 +574,6 @@ impl BlockingLogIterator for RocksLogIterator {
}
}

#[async_trait]
impl LogWriter for RocksImpl {
async fn write<P: Payload>(
&self,
ctx: &WriteContext,
batch: &LogWriteBatch<P>,
) -> Result<SequenceNumber> {
let region = self.get_or_create_region(batch.region_id);
region.write(ctx, batch).await
}
}

#[async_trait]
impl WalManager for RocksImpl {
async fn sequence_num(&self, region_id: RegionId) -> Result<u64> {
Expand Down Expand Up @@ -614,7 +602,6 @@ impl WalManager for RocksImpl {
Ok(())
}

/// Provide iterator on necessary entries according to `ReadRequest`.
async fn read_batch(
&self,
ctx: &ReadContext,
Expand All @@ -634,6 +621,11 @@ impl WalManager for RocksImpl {
ctx.batch_size,
))
}

async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch<'_>) -> Result<SequenceNumber> {
let region = self.get_or_create_region(batch.region_id);
region.write(ctx, batch).await
}
}

impl fmt::Debug for RocksImpl {
Expand Down
2 changes: 1 addition & 1 deletion wal/src/table_kv_impl/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl LogEncoding {
Ok(())
}

pub fn encode_value(&self, buf: &mut BytesMut, payload: &impl Payload) -> Result<()> {
pub fn encode_value(&self, buf: &mut BytesMut, payload: &dyn Payload) -> Result<()> {
buf.clear();
buf.reserve(self.value_enc.estimate_encoded_size(payload));
self.value_enc.encode(buf, payload)
Expand Down
Loading