diff --git a/analytic_engine/src/instance/alter.rs b/analytic_engine/src/instance/alter.rs index 1e4f572905..d637a56e18 100644 --- a/analytic_engine/src/instance/alter.rs +++ b/analytic_engine/src/instance/alter.rs @@ -22,7 +22,7 @@ use crate::{ write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal}, Instance, }, - meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate}, + meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate, MetaUpdateRequest}, payload::WritePayload, space::SpaceAndTable, table::data::TableDataRef, @@ -122,19 +122,19 @@ impl Instance { let payload = WritePayload::AlterSchema(&alter_schema_pb); // Encode payloads - let region_id = table_data.wal_region_id(); + let region_id = table_data.location(); let log_batch_encoder = self.space_store .wal_manager .encoder(region_id) .context(GetLogBatchEncoder { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; // Write log batch @@ -159,7 +159,7 @@ impl Instance { let update = MetaUpdate::AlterSchema(manifest_update); self.space_store .manifest - .store_update(update) + .store_update(MetaUpdateRequest::new(table_data.location(), update)) .await .context(WriteManifest { space_id: table_data.space_id, @@ -290,19 +290,19 @@ impl Instance { let payload = WritePayload::AlterOption(&alter_options_pb); // Encode payload - let region_id = table_data.wal_region_id(); + let region_id = table_data.location(); let log_batch_encoder = self.space_store .wal_manager .encoder(region_id) .context(GetLogBatchEncoder { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; // Write log batch @@ -322,7 +322,7 @@ impl Instance { let meta_update = MetaUpdate::AlterOptions(manifest_update); self.space_store .manifest - .store_update(meta_update) + .store_update(MetaUpdateRequest::new(table_data.location(), meta_update)) .await .context(WriteManifest { space_id: table_data.space_id, diff --git a/analytic_engine/src/instance/create.rs b/analytic_engine/src/instance/create.rs index c4d6c23ee2..8959f4e4b6 100644 --- a/analytic_engine/src/instance/create.rs +++ b/analytic_engine/src/instance/create.rs @@ -15,7 +15,7 @@ use crate::{ write_worker::{self, CreateTableCommand, WorkerLocal}, Instance, }, - meta::meta_update::{AddTableMeta, MetaUpdate}, + meta::meta_update::{AddTableMeta, MetaUpdate, MetaUpdateRequest}, space::SpaceRef, table::data::{TableData, TableDataRef}, table_options, @@ -104,7 +104,7 @@ impl Instance { }); self.space_store .manifest - .store_update(update) + .store_update(MetaUpdateRequest::new(table_data.location(), update)) .await .context(WriteManifest { space_id: space.id, diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index 4f0ed1ba20..b89a268601 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -16,7 +16,7 @@ use crate::{ write_worker::{self, DropTableCommand, WorkerLocal}, Instance, }, - meta::meta_update::{DropTableMeta, MetaUpdate}, + meta::meta_update::{DropTableMeta, MetaUpdate, MetaUpdateRequest}, space::SpaceRef, }; @@ -111,7 +111,7 @@ impl Instance { }); self.space_store .manifest - .store_update(update) + .store_update(MetaUpdateRequest::new(table_data.location(), update)) .await .context(WriteManifest { space_id: space.id, diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 95607eb512..e7a5f4b40f 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -4,14 +4,13 @@ use std::sync::Arc; -use common_types::schema::Version; +use common_types::{schema::Version, table::Location}; use common_util::define_result; use snafu::{Backtrace, OptionExt, Snafu}; use table_engine::{ engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}, table::TableId, }; -use wal::manager::RegionId; use crate::{ instance::{write_worker::WriteGroup, Instance}, @@ -192,26 +191,26 @@ pub enum Error { }, #[snafu(display( - "Failed to get to log batch encoder, table:{}, region_id:{}, err:{}", + "Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}", table, - region_id, + table_location, source ))] GetLogBatchEncoder { table: String, - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, #[snafu(display( - "Failed to encode payloads, table:{},region_id:{}, err:{}", + "Failed to encode payloads, table:{},table_location:{:?}, err:{}", table, - region_id, + table_location, source ))] EncodePayloads { table: String, - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, } @@ -332,7 +331,7 @@ impl Instance { ) -> Result> { let space = self.find_or_create_space(space_id).await?; - let table_data = self.do_open_table(space.clone(), request.table_id).await?; + let table_data = self.do_open_table(space.clone(), request).await?; Ok(table_data.map(|v| SpaceAndTable::new(space, v))) } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 91608dee52..04bf152b73 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -9,6 +9,7 @@ use common_types::{ record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder}, request_id::RequestId, row::RowViewOnBatch, + table::Location, time::TimeRange, SequenceNumber, }; @@ -22,7 +23,6 @@ use log::{error, info}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::Result as TableResult}; use tokio::sync::oneshot; -use wal::manager::RegionId; use crate::{ compaction::{ @@ -33,7 +33,7 @@ use crate::{ Instance, SpaceStore, }, memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest}, - meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta}, + meta::meta_update::{AlterOptionsMeta, MetaUpdate, MetaUpdateRequest, VersionEditMeta}, row_iter::{ self, dedup::DedupIterator, @@ -63,9 +63,13 @@ pub enum Error { source: Box, }, - #[snafu(display("Failed to purge wal, region_id:{}, sequence:{}", region_id, sequence))] + #[snafu(display( + "Failed to purge wal, table_location:{:?}, sequence:{}", + table_location, + sequence + ))] PurgeWal { - region_id: RegionId, + table_location: Location, sequence: SequenceNumber, source: wal::manager::Error, }, @@ -296,7 +300,7 @@ impl Instance { }); self.space_store .manifest - .store_update(meta_update) + .store_update(MetaUpdateRequest::new(table_data.location(), meta_update)) .await .context(StoreVersionEdit)?; @@ -536,7 +540,7 @@ impl Instance { let meta_update = MetaUpdate::VersionEdit(edit_meta); self.space_store .manifest - .store_update(meta_update) + .store_update(MetaUpdateRequest::new(table_data.location(), meta_update)) .await .context(StoreVersionEdit)?; @@ -553,10 +557,10 @@ impl Instance { // Mark sequence <= flushed_sequence to be deleted. self.space_store .wal_manager - .mark_delete_entries_up_to(table_data.wal_region_id(), flushed_sequence) + .mark_delete_entries_up_to(table_data.location(), flushed_sequence) .await .context(PurgeWal { - region_id: table_data.wal_region_id(), + table_location: table_data.location(), sequence: flushed_sequence, })?; @@ -815,7 +819,7 @@ impl SpaceStore { let meta_update = MetaUpdate::VersionEdit(edit_meta.clone()); self.manifest - .store_update(meta_update) + .store_update(MetaUpdateRequest::new(table_data.location(), meta_update)) .await .context(StoreVersionEdit)?; diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 15110ab1d7..08d6181624 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -7,11 +7,11 @@ use std::{ sync::{Arc, RwLock}, }; -use common_types::schema::IndexInWriterSchema; +use common_types::{schema::IndexInWriterSchema, table::Location}; use log::{debug, error, info, trace, warn}; use object_store::ObjectStoreRef; use snafu::ResultExt; -use table_engine::table::TableId; +use table_engine::engine::OpenTableRequest; use tokio::sync::oneshot; use wal::{ log_batch::LogEntry, @@ -130,12 +130,12 @@ impl Instance { pub async fn do_open_table( self: &Arc, space: SpaceRef, - table_id: TableId, + request: &OpenTableRequest, ) -> Result> { - if let Some(table_data) = space.find_table_by_id(table_id) { + if let Some(table_data) = space.find_table_by_id(request.table_id) { return Ok(Some(table_data)); } - let table_data = match self.recover_table_meta_data(table_id).await? { + let table_data = match self.recover_table_meta_data(request).await? { Some(v) => v, None => return Ok(None), }; @@ -196,33 +196,39 @@ impl Instance { /// Return None if no meta data is found for the table. async fn recover_table_meta_data( self: &Arc, - table_id: TableId, + request: &OpenTableRequest, ) -> Result> { - info!("Instance recover table:{} meta begin", table_id); + info!("Instance recover table:{} meta begin", request.table_id); // Load manifest, also create a new snapshot at startup. let manifest_data = self .space_store .manifest - .load_data(table_id, true) + .load_data( + Location::new(request.shard_id, request.table_id.as_u64()), + true, + ) .await - .context(ReadMetaUpdate { table_id })?; + .context(ReadMetaUpdate { + table_id: request.table_id, + })?; let table_data = if let Some(manifest_data) = manifest_data { - Some(self.apply_table_manifest_data(manifest_data).await?) + Some(self.recover_table_data(manifest_data, request).await?) } else { None }; - info!("Instance recover table:{} meta end", table_id); + info!("Instance recover table:{} meta end", request.table_id); Ok(table_data) } - /// Apply manifest data to instance - async fn apply_table_manifest_data( + /// Recover `TableData` by applying manifest data to instance + async fn recover_table_data( self: &Arc, manifest_data: TableManifestData, + request: &OpenTableRequest, ) -> Result { let TableManifestData { table_meta, @@ -243,6 +249,7 @@ impl Instance { write_handle, &self.file_purger, space.mem_usage_collector.clone(), + request.shard_id, ) .context(RecoverTableData { space_id: table_meta.space_id, @@ -273,7 +280,7 @@ impl Instance { read_ctx: &ReadContext, ) -> Result<()> { let read_req = ReadRequest { - region_id: table_data.wal_region_id(), + location: table_data.location(), start: ReadBoundary::Min, end: ReadBoundary::Max, }; diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 1f946f25e8..58f7e0e3e9 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -8,6 +8,7 @@ use common_types::{ bytes::ByteVec, row::RowGroup, schema::{IndexInWriterSchema, Schema}, + table::Location, }; use common_util::{codec::row, define_result}; use log::{debug, error, info, trace, warn}; @@ -16,7 +17,7 @@ use smallvec::SmallVec; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use table_engine::table::WriteRequest; use tokio::sync::oneshot; -use wal::manager::{RegionId, SequenceNumber, WriteContext}; +use wal::manager::{SequenceNumber, WriteContext}; use crate::{ instance::{ @@ -37,26 +38,26 @@ use crate::{ #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( - "Failed to get to log batch encoder, table:{}, region_id:{}, err:{}", + "Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}", table, - region_id, + table_location, source ))] GetLogBatchEncoder { table: String, - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, #[snafu(display( - "Failed to encode payloads, table:{}, region_id:{}, err:{}", + "Failed to encode payloads, table:{}, table_location:{:?}, err:{}", table, - region_id, + table_location, source ))] EncodePayloads { table: String, - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, @@ -398,18 +399,18 @@ impl Instance { // Encode payload let payload = WritePayload::Write(&write_req_pb); - let region_id = table_data.wal_region_id(); + let region_id = table_data.location(); let log_batch_encoder = self.space_store .wal_manager .encoder(region_id) .context(GetLogBatchEncoder { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { table: &table_data.name, - region_id: table_data.wal_region_id(), + table_location: table_data.location(), })?; // Write to wal manager diff --git a/analytic_engine/src/meta/details.rs b/analytic_engine/src/meta/details.rs index b5f591b89d..84b233c6d1 100644 --- a/analytic_engine/src/meta/details.rs +++ b/analytic_engine/src/meta/details.rs @@ -12,11 +12,11 @@ use std::{ }; use async_trait::async_trait; +use common_types::table::Location; use common_util::define_result; use log::{debug, info}; use serde_derive::Deserialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use table_engine::table::TableId; use tokio::sync::Mutex; use wal::{ log_batch::LogEntry, @@ -29,7 +29,8 @@ use wal::{ use crate::meta::{ meta_data::{TableManifestData, TableManifestDataBuilder}, meta_update::{ - MetaUpdate, MetaUpdateDecoder, MetaUpdateLogEntry, MetaUpdatePayload, VersionEditMeta, + MetaUpdate, MetaUpdateDecoder, MetaUpdateLogEntry, MetaUpdatePayload, MetaUpdateRequest, + VersionEditMeta, }, Manifest, }; @@ -37,18 +38,22 @@ use crate::meta::{ #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( - "Failed to get to get log batch encoder, region_id:{}, err:{}", - region_id, + "Failed to get to get log batch encoder, table_location:{:?}, err:{}", + table_location, source ))] GetLogBatchEncoder { - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, - #[snafu(display("Failed to encode payloads, region_id:{}, err:{}", region_id, source))] + #[snafu(display( + "Failed to encode payloads, table_location:{:?}, err:{}", + table_location, + source + ))] EncodePayloads { - region_id: RegionId, + table_location: Location, source: wal::manager::Error, }, #[snafu(display("Failed to write update to wal, err:{}", source))] @@ -181,18 +186,19 @@ impl ManifestImpl { Ok(manifest) } - async fn store_update_to_wal(&self, update: MetaUpdate) -> Result { - info!("Manifest store update, update:{:?}", update); + async fn store_update_to_wal(&self, request: MetaUpdateRequest) -> Result { + info!("Manifest store update, request:{:?}", request); - let region_id = Self::region_id_of_meta_update(&update); - let payload: MetaUpdatePayload = MetaUpdateLogEntry::Normal(update).into(); - let log_batch_encoder = self - .wal_manager - .encoder(region_id) - .context(GetLogBatchEncoder { region_id })?; - let log_batch = log_batch_encoder - .encode(&payload) - .context(EncodePayloads { region_id })?; + let payload: MetaUpdatePayload = MetaUpdateLogEntry::Normal(request.meta_update).into(); + let log_batch_encoder = + self.wal_manager + .encoder(request.location) + .context(GetLogBatchEncoder { + table_location: request.location, + })?; + let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads { + table_location: request.location, + })?; let write_ctx = WriteContext::default(); @@ -205,12 +211,11 @@ impl ManifestImpl { /// Do snapshot if no other snapshot is triggered. /// /// Returns the latest snapshot if snapshot is done. - async fn maybe_do_snapshot(&self, table_id: TableId) -> Result> { + async fn maybe_do_snapshot(&self, location: Location) -> Result> { if let Ok(_guard) = self.snapshot_write_guard.try_lock() { - let region_id = table_id.as_u64(); let snapshotter = Snapshotter { - region_id, - log_store: RegionWal::new(region_id, self.wal_manager.clone()), + location, + log_store: RegionWal::new(location, self.wal_manager.clone()), }; let snapshot = snapshotter.snapshot().await?; @@ -223,11 +228,6 @@ impl ManifestImpl { Ok(None) } - #[inline] - fn region_id_of_meta_update(update: &MetaUpdate) -> RegionId { - update.table_id().as_u64() - } - // with snapshot guard held fn decrease_num_updates(&self, num: usize) { if num > self.num_updates_since_snapshot.load(Ordering::Relaxed) { @@ -243,10 +243,10 @@ impl ManifestImpl { impl Manifest for ManifestImpl { async fn store_update( &self, - update: MetaUpdate, + request: MetaUpdateRequest, ) -> std::result::Result<(), Box> { - let table_id = update.table_id(); - self.store_update_to_wal(update).await?; + let location = request.location; + self.store_update_to_wal(request).await?; let num_updates = self .num_updates_since_snapshot @@ -255,27 +255,26 @@ impl Manifest for ManifestImpl { return Ok(()); } - self.maybe_do_snapshot(table_id).await?; + self.maybe_do_snapshot(location).await?; Ok(()) } async fn load_data( &self, - table_id: TableId, + location: Location, do_snapshot: bool, ) -> std::result::Result, Box> { - let region_id = table_id.as_u64(); if do_snapshot { - if let Some(snapshot) = self.maybe_do_snapshot(table_id).await? { + if let Some(snapshot) = self.maybe_do_snapshot(location).await? { return Ok(snapshot.data); } } let snapshotter = Snapshotter { - region_id, - log_store: RegionWal::new(region_id, self.wal_manager.clone()), + location, + log_store: RegionWal::new(location, self.wal_manager.clone()), }; let snapshot = snapshotter.create_latest_snapshot().await?; Ok(snapshot.data) @@ -294,14 +293,14 @@ trait MetaUpdateLogStore: std::fmt::Debug { #[derive(Debug, Clone)] struct RegionWal { - region_id: RegionId, + location: Location, wal_manager: WalManagerRef, } impl RegionWal { - fn new(region_id: RegionId, wal_manager: WalManagerRef) -> Self { + fn new(location: Location, wal_manager: WalManagerRef) -> Self { Self { - region_id, + location, wal_manager, } } @@ -314,7 +313,7 @@ impl MetaUpdateLogStore for RegionWal { async fn scan(&self, start: ReadBoundary, end: ReadBoundary) -> Result { let ctx = ReadContext::default(); let read_req = ReadRequest { - region_id: self.region_id, + location: self.location, start, end, }; @@ -331,14 +330,14 @@ impl MetaUpdateLogStore for RegionWal { } async fn store(&self, log_entries: &[MetaUpdateLogEntry]) -> Result<()> { - let region_id = self.region_id; + let table_location = self.location; let log_batch_encoder = self .wal_manager - .encoder(self.region_id) - .context(GetLogBatchEncoder { region_id })?; + .encoder(table_location) + .context(GetLogBatchEncoder { table_location })?; let log_batch = log_batch_encoder .encode_batch::(log_entries) - .context(EncodePayloads { region_id })?; + .context(EncodePayloads { table_location })?; let write_ctx = WriteContext::default(); @@ -352,7 +351,7 @@ impl MetaUpdateLogStore for RegionWal { async fn delete_up_to(&self, inclusive_end: SequenceNumber) -> Result<()> { self.wal_manager - .mark_delete_entries_up_to(self.region_id, inclusive_end) + .mark_delete_entries_up_to(self.location, inclusive_end) .await .context(CleanWal) } @@ -411,7 +410,7 @@ impl MetaUpdateLogStore for RegionWal { /// just read all logs and convert them into snapshot. #[derive(Debug, Clone)] struct Snapshotter { - region_id: RegionId, + location: Location, log_store: S, } @@ -447,8 +446,8 @@ impl Snapshotter { async fn snapshot(&self) -> Result { let snapshot = self.create_latest_snapshot().await?; info!( - "Store snapshot to region, region_id:{}, snapshot_end_seq:{}", - self.region_id, snapshot.end_seq, + "Store snapshot to region, location:{:?}, snapshot_end_seq:{}", + self.location, snapshot.end_seq, ); // Delete the expired logs after saving the snapshot. @@ -664,7 +663,13 @@ impl Snapshotter { mod tests { use std::{collections::HashMap, iter::FromIterator, path::PathBuf, sync::Arc, vec}; - use common_types::{column_schema, datum::DatumKind, schema, schema::Schema}; + use common_types::{ + column_schema, + datum::DatumKind, + schema, + schema::Schema, + table::{Location, DEFAULT_SHARD_ID}, + }; use common_util::{runtime, runtime::Runtime, tests::init_log_for_test}; use futures::future::BoxFuture; use table_engine::table::{SchemaId, TableId, TableSeqGenerator}; @@ -767,21 +772,21 @@ mod tests { async fn check_table_manifest_data_with_manifest( &self, - table_id: TableId, + location: Location, expected: &Option, manifest: &ManifestImpl, ) { - let data = manifest.load_data(table_id, false).await.unwrap(); + let data = manifest.load_data(location, false).await.unwrap(); assert_eq!(&data, expected); } async fn check_table_manifest_data( &self, - table_id: TableId, + location: Location, expected: &Option, ) { let manifest = self.open_manifest().await; - self.check_table_manifest_data_with_manifest(table_id, expected, &manifest) + self.check_table_manifest_data_with_manifest(location, expected, &manifest) .await; } @@ -845,8 +850,12 @@ mod tests { manifest_data_builder: &mut TableManifestDataBuilder, manifest: &ManifestImpl, ) { + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let add_table = self.meta_update_add_table(table_id); - manifest.store_update(add_table.clone()).await.unwrap(); + manifest + .store_update(MetaUpdateRequest::new(location, add_table.clone())) + .await + .unwrap(); manifest_data_builder.apply_update(add_table).unwrap(); } @@ -856,8 +865,12 @@ mod tests { manifest_data_builder: &mut TableManifestDataBuilder, manifest: &ManifestImpl, ) { + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let drop_table = self.meta_update_drop_table(table_id); - manifest.store_update(drop_table.clone()).await.unwrap(); + manifest + .store_update(MetaUpdateRequest::new(location, drop_table.clone())) + .await + .unwrap(); manifest_data_builder.apply_update(drop_table).unwrap(); } @@ -868,8 +881,12 @@ mod tests { manifest_data_builder: &mut TableManifestDataBuilder, manifest: &ManifestImpl, ) { + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let version_edit = self.meta_update_version_edit(table_id, flushed_seq); - manifest.store_update(version_edit.clone()).await.unwrap(); + manifest + .store_update(MetaUpdateRequest::new(location, version_edit.clone())) + .await + .unwrap(); manifest_data_builder.apply_update(version_edit).unwrap(); } @@ -911,8 +928,12 @@ mod tests { ) { let manifest = self.open_manifest().await; + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let alter_options = self.meta_update_alter_table_options(table_id); - manifest.store_update(alter_options.clone()).await.unwrap(); + manifest + .store_update(MetaUpdateRequest::new(location, alter_options.clone())) + .await + .unwrap(); manifest_data_builder.apply_update(alter_options).unwrap(); } @@ -923,8 +944,12 @@ mod tests { ) { let manifest = self.open_manifest().await; + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let alter_schema = self.meta_update_alter_table_schema(table_id); - manifest.store_update(alter_schema.clone()).await.unwrap(); + manifest + .store_update(MetaUpdateRequest::new(location, alter_schema.clone())) + .await + .unwrap(); manifest_data_builder.apply_update(alter_schema).unwrap(); } } @@ -944,8 +969,9 @@ mod tests { update_table_meta(&ctx, table_id, &mut manifest_data_builder).await; + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let expected_table_manifest_data = manifest_data_builder.build(); - ctx.check_table_manifest_data(table_id, &expected_table_manifest_data) + ctx.check_table_manifest_data(location, &expected_table_manifest_data) .await; }) } @@ -1013,12 +1039,13 @@ mod tests { let runtime = ctx.runtime.clone(); runtime.block_on(async move { let table_id = ctx.alloc_table_id(); + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let mut manifest_data_builder = TableManifestDataBuilder::default(); let manifest = ctx.open_manifest().await; ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest) .await; - manifest.maybe_do_snapshot(table_id).await.unwrap(); + manifest.maybe_do_snapshot(location).await.unwrap(); ctx.version_edit_table_with_manifest( table_id, @@ -1028,7 +1055,7 @@ mod tests { ) .await; ctx.check_table_manifest_data_with_manifest( - table_id, + location, &manifest_data_builder.build(), &manifest, ) @@ -1042,6 +1069,7 @@ mod tests { let runtime = ctx.runtime.clone(); runtime.block_on(async move { let table_id = ctx.alloc_table_id(); + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let mut manifest_data_builder = TableManifestDataBuilder::default(); let manifest = ctx.open_manifest().await; ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest) @@ -1057,13 +1085,13 @@ mod tests { .await; } ctx.check_table_manifest_data_with_manifest( - table_id, + location, &manifest_data_builder.clone().build(), &manifest, ) .await; - manifest.maybe_do_snapshot(table_id).await.unwrap(); + manifest.maybe_do_snapshot(location).await.unwrap(); for i in 500..550 { ctx.version_edit_table_with_manifest( table_id, @@ -1074,7 +1102,7 @@ mod tests { .await; } ctx.check_table_manifest_data_with_manifest( - table_id, + location, &manifest_data_builder.build(), &manifest, ) @@ -1186,6 +1214,7 @@ mod tests { manifest_builder.build() }; + let location = Location::new(DEFAULT_SHARD_ID, table_id.as_u64()); let log_store = { let log_entries: Vec<_> = logs .iter() @@ -1196,7 +1225,7 @@ mod tests { ctx.runtime.block_on(async move { let snapshotter = Snapshotter { - region_id: table_id.as_u64(), + location, log_store, }; diff --git a/analytic_engine/src/meta/meta_update.rs b/analytic_engine/src/meta/meta_update.rs index eac8f86f59..8924ab244b 100644 --- a/analytic_engine/src/meta/meta_update.rs +++ b/analytic_engine/src/meta/meta_update.rs @@ -7,6 +7,7 @@ use std::convert::{TryFrom, TryInto}; use common_types::{ bytes::{MemBuf, MemBufMut, Writer}, schema::{Schema, Version}, + table::Location, SequenceNumber, }; use common_util::define_result; @@ -462,3 +463,18 @@ impl PayloadDecoder for MetaUpdateDecoder { Ok(log_entry) } } + +#[derive(Debug, Clone)] +pub struct MetaUpdateRequest { + pub location: Location, + pub meta_update: MetaUpdate, +} + +impl MetaUpdateRequest { + pub fn new(location: Location, meta_update: MetaUpdate) -> Self { + Self { + location, + meta_update, + } + } +} diff --git a/analytic_engine/src/meta/mod.rs b/analytic_engine/src/meta/mod.rs index 1af1a84bb0..4096744c40 100644 --- a/analytic_engine/src/meta/mod.rs +++ b/analytic_engine/src/meta/mod.rs @@ -9,10 +9,9 @@ pub mod meta_update; use std::{fmt, sync::Arc}; use async_trait::async_trait; -use meta_update::MetaUpdate; -use table_engine::table::TableId; +use common_types::table::Location; -use crate::meta::meta_data::TableManifestData; +use crate::meta::{meta_data::TableManifestData, meta_update::MetaUpdateRequest}; /// Manifest holds meta data of all tables. #[async_trait] @@ -20,7 +19,7 @@ pub trait Manifest: Send + Sync + fmt::Debug { /// Store update to manifest async fn store_update( &self, - update: MetaUpdate, + request: MetaUpdateRequest, ) -> std::result::Result<(), Box>; /// Load table meta data from manifest. @@ -29,7 +28,7 @@ pub trait Manifest: Send + Sync + fmt::Debug { /// the manifest data. async fn load_data( &self, - table_id: TableId, + location: Location, do_snapshot: bool, ) -> Result, Box>; } diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 081cfbfaf4..a2ab694289 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -18,6 +18,7 @@ use arc_swap::ArcSwap; use arena::CollectorRef; use common_types::{ schema::{Schema, Version}, + table::{Location, ShardId}, time::{TimeRange, Timestamp}, SequenceNumber, }; @@ -26,7 +27,6 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::CreateTableRequest, table::TableId}; -use wal::manager::RegionId; use crate::{ instance::write_worker::{WorkerLocal, WriteHandle}, @@ -134,6 +134,9 @@ pub struct TableData { /// Metrics of this table. pub metrics: Metrics, + + /// Shard id + pub shard_id: ShardId, } impl fmt::Debug for TableData { @@ -149,6 +152,7 @@ impl fmt::Debug for TableData { .field("last_memtable_id", &self.last_memtable_id) .field("last_file_id", &self.last_file_id) .field("dropped", &self.dropped.load(Ordering::Relaxed)) + .field("shard_id", &self.shard_id) .finish() } } @@ -204,6 +208,7 @@ impl TableData { last_flush_time_ms: AtomicU64::new(0), dropped: AtomicBool::new(false), metrics, + shard_id: request.shard_id, }) } @@ -215,6 +220,7 @@ impl TableData { write_handle: WriteHandle, purger: &FilePurger, mem_usage_collector: CollectorRef, + shard_id: ShardId, ) -> Result { let memtable_factory = Arc::new(SkiplistMemTableFactory); let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); @@ -240,6 +246,7 @@ impl TableData { last_flush_time_ms: AtomicU64::new(0), dropped: AtomicBool::new(false), metrics, + shard_id, }) } @@ -268,8 +275,8 @@ impl TableData { /// /// Now we just use table id as region id #[inline] - pub fn wal_region_id(&self) -> RegionId { - self.id.as_u64() + pub fn shard_id(&self) -> ShardId { + self.shard_id } /// Get last sequence number @@ -490,6 +497,10 @@ impl TableData { pub fn storage_format(&self) -> StorageFormat { self.table_options().storage_format } + + pub fn location(&self) -> Location { + Location::new(self.shard_id, self.id.as_u64()) + } } /// Table data reference @@ -581,7 +592,7 @@ pub mod tests { use std::sync::Arc; use arena::NoopCollector; - use common_types::datum::DatumKind; + use common_types::{datum::DatumKind, table::DEFAULT_SHARD_ID}; use common_util::config::ReadableDuration; use table_engine::{engine::TableState, table::SchemaId}; @@ -626,6 +637,7 @@ pub mod tests { pub struct TableDataMocker { table_id: TableId, table_name: String, + shard_id: ShardId, write_handle: Option, } @@ -640,6 +652,11 @@ pub mod tests { self } + pub fn shard_id(mut self, shard_id: ShardId) -> Self { + self.shard_id = shard_id; + self + } + pub fn write_handle(mut self, write_handle: WriteHandle) -> Self { self.write_handle = Some(write_handle); self @@ -659,6 +676,7 @@ pub mod tests { engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), options: HashMap::new(), state: TableState::Stable, + shard_id: self.shard_id, }; let write_handle = self.write_handle.unwrap_or_else(|| { @@ -686,6 +704,7 @@ pub mod tests { Self { table_id: table::new_table_id(2, 1), table_name: "mocked_table".to_string(), + shard_id: DEFAULT_SHARD_ID, write_handle: None, } } @@ -695,14 +714,16 @@ pub mod tests { fn test_new_table_data() { let table_id = table::new_table_id(100, 30); let table_name = "new_table".to_string(); + let shard_id = 42; let table_data = TableDataMocker::default() .table_id(table_id) .table_name(table_name.clone()) + .shard_id(shard_id) .build(); assert_eq!(table_id, table_data.id); assert_eq!(table_name, table_data.name); - assert_eq!(table_data.id.as_u64(), table_data.wal_region_id()); + assert_eq!(shard_id, table_data.shard_id); assert_eq!(0, table_data.last_sequence()); assert!(!table_data.is_dropped()); assert_eq!(0, table_data.last_file_id()); diff --git a/analytic_engine/src/tests/table.rs b/analytic_engine/src/tests/table.rs index 63d88b8d51..8e18998204 100644 --- a/analytic_engine/src/tests/table.rs +++ b/analytic_engine/src/tests/table.rs @@ -12,6 +12,7 @@ use common_types::{ request_id::RequestId, row::{Row, RowGroup, RowGroupBuilder}, schema::{self, Schema}, + table::DEFAULT_SHARD_ID, time::Timestamp, }; use common_util::config::ReadableDuration; @@ -306,6 +307,7 @@ impl Default for Builder { engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), options: HashMap::new(), state: TableState::Stable, + shard_id: DEFAULT_SHARD_ID, }, } } diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index f60c4d170c..03aad43434 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -8,6 +8,7 @@ use common_types::{ datum::Datum, record_batch::RecordBatch, row::{Row, RowGroup}, + table::DEFAULT_SHARD_ID, time::Timestamp, }; use common_util::{config::ReadableDuration, runtime}; @@ -169,6 +170,7 @@ impl TestContext { table_name: table_name.to_string(), table_id, engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + shard_id: DEFAULT_SHARD_ID, }) .await .unwrap() @@ -191,6 +193,7 @@ impl TestContext { table_name: table_name.to_string(), table_id, engine: table_engine::ANALYTIC_ENGINE_TYPE.to_string(), + shard_id: DEFAULT_SHARD_ID, }) .await?; diff --git a/analytic_engine/src/wal_synchronizer.rs b/analytic_engine/src/wal_synchronizer.rs index 42c94c804e..2b24a0d0fb 100644 --- a/analytic_engine/src/wal_synchronizer.rs +++ b/analytic_engine/src/wal_synchronizer.rs @@ -11,7 +11,7 @@ use std::{ time::Duration, }; -use common_types::SequenceNumber; +use common_types::{table::Location, SequenceNumber}; use common_util::{ define_result, runtime::{JoinHandle, Runtime}, @@ -30,7 +30,7 @@ use wal::{ log_batch::LogEntry, manager::{ BatchLogIterator, BatchLogIteratorAdapter, ReadBoundary, ReadContext, ReadRequest, - RegionId, WalManagerRef, + WalManagerRef, }, }; @@ -132,8 +132,8 @@ impl WalSynchronizer { } #[allow(dead_code)] - pub async fn register_table(&self, region_id: RegionId, table: ReaderTable) { - self.inner.register_table(region_id, table).await; + pub async fn register_table(&self, location: Location, table: ReaderTable) { + self.inner.register_table(location, table).await; } pub async fn start(&mut self, runtime: &Runtime) { @@ -149,18 +149,18 @@ impl WalSynchronizer { pub struct Inner { wal: WalManagerRef, config: WalSynchronizerConfig, - tables: RwLock>, + tables: RwLock>, } impl Inner { #[allow(dead_code)] - pub async fn register_table(&self, region_id: RegionId, table: ReaderTable) { + pub async fn register_table(&self, location: Location, table: ReaderTable) { let state = SynchronizeState { - region_id, + location, table, last_synced_seq: AtomicU64::new(SequenceNumber::MIN), }; - self.tables.write().await.insert(region_id, state); + self.tables.write().await.insert(location, state); } pub async fn start_synchronize(self: Arc, mut stop_listener: Receiver<()>) { @@ -173,7 +173,7 @@ impl Inner { }; loop { - let mut invalid_regions = vec![]; + let mut invalid_tables = vec![]; let tables = self.tables.read().await; // todo: consider clone [SynchronizeState] out to release the read lock. let states = tables.values().collect::>(); @@ -182,7 +182,7 @@ impl Inner { for state in states { // check state before polling WAL if !state.check_state() { - invalid_regions.push(state.region_id); + invalid_tables.push(state.location); continue; } @@ -204,7 +204,7 @@ impl Inner { // double check state before writing to table. Error due to // state changed after this check will be treat as normal error. if !state.check_state() { - invalid_regions.push(state.region_id); + invalid_tables.push(state.location); continue; } @@ -215,7 +215,7 @@ impl Inner { } drop(tables); - self.purge_invalid_region(&mut invalid_regions).await; + self.purge_invalid_tables(&mut invalid_tables).await; if time::timeout(self.config.interval, stop_listener.recv()) .await @@ -287,26 +287,26 @@ impl Inner { Ok(()) } - /// Remove invalid regions from poll list. This function will clear the - /// `invalid_region` vec. - async fn purge_invalid_region(&self, invalid_regions: &mut Vec) { - if invalid_regions.is_empty() { + /// Remove invalid tables from poll list. This function will clear the + /// `invalid_table` vec. + async fn purge_invalid_tables(&self, invalid_tables: &mut Vec) { + if invalid_tables.is_empty() { return; } debug!( "Removing invalid region from WAL Synchronizer: {:?}", - invalid_regions + invalid_tables ); let mut tables = self.tables.write().await; - for region in invalid_regions.drain(..) { - tables.remove(®ion); + for location in invalid_tables.drain(..) { + tables.remove(&location); } } } struct SynchronizeState { - region_id: RegionId, + location: Location, table: ReaderTable, /// Atomic version of [SequenceNumber] last_synced_seq: AtomicU64, @@ -315,7 +315,7 @@ struct SynchronizeState { impl SynchronizeState { pub fn read_req(&self) -> ReadRequest { ReadRequest { - region_id: self.region_id, + location: self.location, start: ReadBoundary::Excluded(self.last_synced_seq.load(Ordering::Relaxed)), end: ReadBoundary::Max, } diff --git a/benchmarks/src/wal_write_bench.rs b/benchmarks/src/wal_write_bench.rs index c0ae88949d..5204f68120 100644 --- a/benchmarks/src/wal_write_bench.rs +++ b/benchmarks/src/wal_write_bench.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use common_types::table::Location; use common_util::runtime::Runtime; use rand::prelude::*; use table_kv::memory::MemoryImpl; @@ -75,7 +76,7 @@ impl WalWriteBench { let values = self.build_value_vec(); let wal_encoder = wal - .encoder(1) + .encoder(Location::new(1, 1)) .expect("should succeed to create wal encoder"); let log_batch = wal_encoder .encode_batch::>(values.as_slice()) diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs index 66726c33ac..28148bd7ba 100644 --- a/catalog/src/schema.rs +++ b/catalog/src/schema.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use common_types::column_schema::ColumnSchema; +use common_types::{column_schema::ColumnSchema, table::DEFAULT_SHARD_ID}; use snafu::{Backtrace, Snafu}; use table_engine::{ engine::{self, TableEngineRef, TableState}, @@ -171,6 +171,7 @@ impl CreateTableRequest { engine: self.engine, options: self.options, state: self.state, + shard_id: DEFAULT_SHARD_ID, } } } diff --git a/common_types/src/table.rs b/common_types/src/table.rs index 2528c1247f..2f54491c74 100644 --- a/common_types/src/table.rs +++ b/common_types/src/table.rs @@ -2,3 +2,17 @@ pub type TableId = u64; pub type TableName = String; +pub type ShardId = u32; +pub const DEFAULT_SHARD_ID: u32 = 0; + +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Ord, PartialOrd)] +pub struct Location { + pub shard_id: ShardId, + pub table_id: TableId, +} + +impl Location { + pub fn new(shard_id: ShardId, table_id: TableId) -> Self { + Self { shard_id, table_id } + } +} diff --git a/server/src/table_engine.rs b/server/src/table_engine.rs index 1def3a5623..8c8cfaeef3 100644 --- a/server/src/table_engine.rs +++ b/server/src/table_engine.rs @@ -99,7 +99,7 @@ impl TableEngine for TableEngineProxy { } /// Close table, it is ok to close a closed table. - async fn close_table(&self, request: OpenTableRequest) -> Result<()> { + async fn close_table(&self, request: CloseTableRequest) -> Result<()> { match request.engine.as_str() { MEMORY_ENGINE_TYPE => self.memory.close_table(request).await, ANALYTIC_ENGINE_TYPE => self.analytic.close_table(request).await, diff --git a/system_catalog/src/sys_catalog_table.rs b/system_catalog/src/sys_catalog_table.rs index 402bb7a33e..930bc9b0b3 100644 --- a/system_catalog/src/sys_catalog_table.rs +++ b/system_catalog/src/sys_catalog_table.rs @@ -15,6 +15,7 @@ use common_types::{ request_id::RequestId, row::{Row, RowGroup, RowGroupBuilder}, schema::{self, Schema}, + table::DEFAULT_SHARD_ID, time::Timestamp, }; use common_util::{ @@ -273,6 +274,7 @@ impl SysCatalogTable { table_name: SYS_CATALOG_TABLE_NAME.to_string(), table_id: SYS_CATALOG_TABLE_ID, engine: table_engine.engine_type().to_string(), + shard_id: DEFAULT_SHARD_ID, }; let table_opt = table_engine @@ -312,6 +314,7 @@ impl SysCatalogTable { engine: table_engine.engine_type().to_string(), options, state: TableState::Stable, + shard_id: DEFAULT_SHARD_ID, }; let table = table_engine diff --git a/table_engine/src/engine.rs b/table_engine/src/engine.rs index 7b8b56c0fe..5c2f786add 100644 --- a/table_engine/src/engine.rs +++ b/table_engine/src/engine.rs @@ -5,7 +5,11 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use common_types::{schema::Schema, time::Timestamp}; +use common_types::{ + schema::Schema, + table::{ShardId, DEFAULT_SHARD_ID}, + time::Timestamp, +}; use common_util::runtime::Runtime; use proto::sys_catalog::{TableEntry, TableState as TableStatePb}; use snafu::{ensure, Backtrace, Snafu}; @@ -155,6 +159,10 @@ pub struct CreateTableRequest { pub options: HashMap, /// Tells state of the table pub state: TableState, + /// Shard id, shard is the table set about scheduling from nodes + /// It will be assigned the default value in standalone mode, + /// and just be useful in cluster mode + pub shard_id: ShardId, } impl CreateTableRequest { @@ -226,9 +234,14 @@ pub struct OpenTableRequest { pub table_id: TableId, /// Table engine type pub engine: String, + /// Shard id, shard is the table set about scheduling from nodes + pub shard_id: ShardId, } impl From for OpenTableRequest { + /// The `shard_id` is not persisted and just assigned a default value + /// while recovered from `TableInfo`. + /// This conversion will just happen in standalone mode. fn from(table_info: TableInfo) -> Self { Self { catalog_name: table_info.catalog_name, @@ -237,11 +250,26 @@ impl From for OpenTableRequest { table_name: table_info.table_name, table_id: table_info.table_id, engine: table_info.engine, + shard_id: DEFAULT_SHARD_ID, } } } -pub type CloseTableRequest = OpenTableRequest; +#[derive(Debug, Clone)] +pub struct CloseTableRequest { + /// Catalog name + pub catalog_name: String, + /// Schema name + pub schema_name: String, + /// Schema id + pub schema_id: SchemaId, + /// Table name + pub table_name: String, + /// Table id + pub table_id: TableId, + /// Table engine type + pub engine: String, +} /// Table engine // TODO(yingwen): drop table support to release resource owned by the table diff --git a/wal/src/kv_encoder.rs b/wal/src/kv_encoder.rs index 99d1615271..af92d3f3d5 100644 --- a/wal/src/kv_encoder.rs +++ b/wal/src/kv_encoder.rs @@ -4,6 +4,7 @@ use common_types::{ bytes::{self, BytesMut, MemBuf, MemBufMut}, + table::Location, SequenceNumber, }; use common_util::{ @@ -116,6 +117,7 @@ pub enum Namespace { Log = 1, } +/// Log key in old wal design, map the `TableId` to `RegionId` pub type LogKey = (RegionId, SequenceNumber); #[derive(Debug, Clone)] @@ -522,22 +524,22 @@ impl LogEncoding { /// LogBatchEncoder which are used to encode specify payloads. #[derive(Debug)] pub struct LogBatchEncoder { - region_id: RegionId, + location: Location, log_encoding: LogEncoding, } impl LogBatchEncoder { /// Create LogBatchEncoder with specific region_id. - pub fn create(region_id: RegionId) -> Self { + pub fn create(location: Location) -> Self { Self { - region_id, + location, log_encoding: LogEncoding::newest(), } } /// Consume LogBatchEncoder and encode single payload to LogWriteBatch. pub fn encode(self, payload: &impl Payload) -> manager::Result { - let mut write_batch = LogWriteBatch::new(self.region_id); + let mut write_batch = LogWriteBatch::new(self.location); let mut buf = BytesMut::new(); self.log_encoding .encode_value(&mut buf, payload) @@ -561,7 +563,7 @@ impl LogBatchEncoder { where &'a I: Into

, { - let mut write_batch = LogWriteBatch::new(self.region_id); + let mut write_batch = LogWriteBatch::new(self.location); let mut buf = BytesMut::new(); for raw_payload in raw_payload_batch.iter() { self.log_encoding diff --git a/wal/src/log_batch.rs b/wal/src/log_batch.rs index 5a3416e952..49dc7f6d59 100644 --- a/wal/src/log_batch.rs +++ b/wal/src/log_batch.rs @@ -6,11 +6,10 @@ use std::fmt::Debug; use common_types::{ bytes::{MemBuf, MemBufMut}, + table::Location, SequenceNumber, }; -use crate::manager::RegionId; - pub trait Payload: Send + Sync + Debug { type Error: std::error::Error + Send + Sync + 'static; @@ -35,18 +34,18 @@ pub struct LogWriteEntry { /// A batch of `LogWriteEntry`s. #[derive(Debug)] pub struct LogWriteBatch { - pub(crate) region_id: RegionId, + pub(crate) location: Location, pub(crate) entries: Vec, } impl LogWriteBatch { - pub fn new(region_id: RegionId) -> Self { - Self::with_capacity(region_id, 0) + pub fn new(location: Location) -> Self { + Self::with_capacity(location, 0) } - pub fn with_capacity(region_id: RegionId, cap: usize) -> Self { + pub fn with_capacity(location: Location, cap: usize) -> Self { Self { - region_id, + location, entries: Vec::with_capacity(cap), } } @@ -72,12 +71,6 @@ impl LogWriteBatch { } } -impl Default for LogWriteBatch { - fn default() -> Self { - Self::new(0) - } -} - pub trait PayloadDecoder: Send + Sync { type Error: std::error::Error + Send + Sync + 'static; type Target: Send + Sync; diff --git a/wal/src/manager.rs b/wal/src/manager.rs index 00a2e9e9d4..7aeafd3107 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -6,7 +6,9 @@ use std::{collections::VecDeque, fmt, sync::Arc, time::Duration}; use async_trait::async_trait; pub use common_types::SequenceNumber; +use common_types::{table::Location, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER}; use common_util::runtime::Runtime; +pub use error::*; use snafu::ResultExt; use crate::{ @@ -16,11 +18,10 @@ use crate::{ }; pub mod error { + use common_types::table::Location; use common_util::define_result; use snafu::{Backtrace, Snafu}; - use crate::manager::RegionId; - // Now most error from manage implementation don't have backtrace, so we add // backtrace here. #[derive(Debug, Snafu)] @@ -45,12 +46,12 @@ pub mod error { }, #[snafu(display( - "Region is not found, region_id:{}.\nBacktrace:\n{}", - region_id, + "Region is not found, table_location:{:?}.\nBacktrace:\n{}", + location, backtrace ))] RegionNotFound { - region_id: RegionId, + location: Location, backtrace: Backtrace, }, @@ -119,9 +120,6 @@ pub mod error { define_result!(Error); } -use common_types::{MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER}; -pub use error::*; - pub type RegionId = u64; pub const MAX_REGION_ID: RegionId = u64::MAX; @@ -207,8 +205,8 @@ impl ReadBoundary { #[derive(Debug, Clone)] pub struct ReadRequest { - /// Region id of the wal to read - pub region_id: RegionId, + /// Location of the wal to read + pub location: Location, // TODO(yingwen): Or just rename to ReadBound? /// Start bound pub start: ReadBoundary, @@ -216,6 +214,14 @@ pub struct ReadRequest { pub end: ReadBoundary, } +#[derive(Debug, Clone)] +pub struct ScanRequest { + /// Region id of the wals to be scanned + pub region_id: RegionId, +} + +pub type ScanContext = ReadContext; + /// Blocking Iterator abstraction for log entry. pub trait BlockingLogIterator: Send + fmt::Debug { /// Fetch next log entry from the iterator. @@ -246,13 +252,13 @@ pub trait BatchLogIterator { #[async_trait] pub trait WalManager: Send + Sync + fmt::Debug + 'static { /// Get current sequence number. - async fn sequence_num(&self, region_id: RegionId) -> Result; + async fn sequence_num(&self, location: Location) -> Result; /// Mark the entries whose sequence number is in [0, `sequence_number`] to /// be deleted in the future. async fn mark_delete_entries_up_to( &self, - region_id: RegionId, + location: Location, sequence_num: SequenceNumber, ) -> Result<()>; @@ -267,14 +273,17 @@ pub trait WalManager: Send + Sync + fmt::Debug + 'static { ) -> Result; /// Provide the encoder for encoding payloads. - fn encoder(&self, region_id: RegionId) -> Result { - Ok(LogBatchEncoder::create(region_id)) + fn encoder(&self, location: Location) -> Result { + Ok(LogBatchEncoder::create(location)) } /// Write a batch of log entries to log. /// /// Returns the max sequence number for the batch of log entries. async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result; + + /// Scan all logs from a `Region`. + async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> Result; } /// Adapter to convert a blocking interator to a batch async iterator. diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index 054137e19c..7689815863 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -14,7 +14,9 @@ use std::{ }; use async_trait::async_trait; -use common_types::{bytes::BytesMut, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER}; +use common_types::{ + bytes::BytesMut, table::Location, SequenceNumber, MAX_SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, +}; use common_util::runtime::Runtime; use log::{debug, info, warn}; use rocksdb::{DBIterator, DBOptions, ReadOptions, SeekKey, Writable, WriteBatch, DB}; @@ -26,7 +28,7 @@ use crate::{ log_batch::{LogEntry, LogWriteBatch}, manager::{ error::*, BatchLogIteratorAdapter, BlockingLogIterator, ReadContext, ReadRequest, RegionId, - WalManager, WriteContext, MAX_REGION_ID, + ScanContext, ScanRequest, WalManager, WriteContext, MAX_REGION_ID, }, }; @@ -178,7 +180,7 @@ impl Region { for entry in &batch.entries { self.log_encoding - .encode_key(&mut key_buf, &(batch.region_id, next_sequence_num)) + .encode_key(&mut key_buf, &(batch.location.table_id, next_sequence_num)) .map_err(|e| Box::new(e) as _) .context(Encoding)?; wb.put(&key_buf, &entry.payload) @@ -599,8 +601,8 @@ impl BlockingLogIterator for RocksLogIterator { #[async_trait] impl WalManager for RocksImpl { - async fn sequence_num(&self, region_id: RegionId) -> Result { - if let Some(region) = self.region(region_id) { + async fn sequence_num(&self, location: Location) -> Result { + if let Some(region) = self.region(location.table_id) { return region.sequence_num(); } @@ -609,10 +611,10 @@ impl WalManager for RocksImpl { async fn mark_delete_entries_up_to( &self, - region_id: RegionId, + location: Location, sequence_num: SequenceNumber, ) -> Result<()> { - if let Some(region) = self.region(region_id) { + if let Some(region) = self.region(location.table_id) { return region.delete_entries_up_to(sequence_num).await; } @@ -630,7 +632,7 @@ impl WalManager for RocksImpl { ctx: &ReadContext, req: &ReadRequest, ) -> Result { - let blocking_iter = if let Some(region) = self.region(req.region_id) { + let blocking_iter = if let Some(region) = self.region(req.location.table_id) { region.read(ctx, req)? } else { let iter = DBIterator::new(self.db.clone(), ReadOptions::default()); @@ -646,9 +648,17 @@ impl WalManager for RocksImpl { } async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result { - let region = self.get_or_create_region(batch.region_id); + let region = self.get_or_create_region(batch.location.table_id); region.write(ctx, batch).await } + + async fn scan( + &self, + _ctx: &ScanContext, + _req: &ScanRequest, + ) -> Result { + todo!() + } } impl fmt::Debug for RocksImpl { diff --git a/wal/src/table_kv_impl/encoding.rs b/wal/src/table_kv_impl/encoding.rs index f9dd59147e..7a78b9b0c6 100644 --- a/wal/src/table_kv_impl/encoding.rs +++ b/wal/src/table_kv_impl/encoding.rs @@ -92,7 +92,7 @@ mod tests { use std::time::Duration; use super::*; - use crate::table_kv_impl::namespace; + use crate::{manager::RegionId, table_kv_impl::namespace}; #[test] fn test_format_namespace_key() { diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index d3e8d2dfff..b60195ae0f 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -550,7 +550,7 @@ impl NamespaceInner { ctx: &manager::WriteContext, batch: &LogWriteBatch, ) -> Result { - let region_id = batch.region_id; + let region_id = batch.location.table_id; let now = Timestamp::now(); // Get current bucket to write. let bucket = self.get_or_create_bucket(now)?; @@ -584,8 +584,8 @@ impl NamespaceInner { // buckets. let buckets = self.list_buckets(); - let region_id = req.region_id; - if let Some(region) = self.get_or_open_region(req.region_id).await? { + let region_id = req.location.table_id; + if let Some(region) = self.get_or_open_region(region_id).await? { region .read_log(&self.table_kv, buckets, ctx, req) .await @@ -1309,7 +1309,10 @@ fn start_bucket_monitor( mod tests { use std::sync::Arc; - use common_types::bytes::BytesMut; + use common_types::{ + bytes::BytesMut, + table::{Location, DEFAULT_SHARD_ID}, + }; use common_util::runtime::{Builder, Runtime}; use table_kv::{memory::MemoryImpl, KeyBoundary, ScanContext, ScanRequest}; @@ -1611,12 +1614,16 @@ mod tests { runtime.block_on(async { let namespace = NamespaceMocker::new(table_kv.clone(), runtime.clone()).build(); - let region_id = 123; + let table_id = 123; + let location = Location::new(DEFAULT_SHARD_ID, table_id); - let seq1 = write_test_payloads(&namespace, region_id, 1000, 1004).await; - write_test_payloads(&namespace, region_id, 1005, 1009).await; + let seq1 = write_test_payloads(&namespace, location, 1000, 1004).await; + write_test_payloads(&namespace, location, 1005, 1009).await; - namespace.delete_entries(region_id, seq1).await.unwrap(); + namespace + .delete_entries(location.table_id, seq1) + .await + .unwrap(); let inner = &namespace.inner; log_cleaner_routine(inner.clone()).await; @@ -1624,8 +1631,8 @@ mod tests { let buckets = inner.list_buckets(); assert_eq!(1, buckets.len()); - let table = buckets[0].wal_shard_table(region_id); - let key_values = direct_read_logs_from_table(&table_kv, table, region_id).await; + let table = buckets[0].wal_shard_table(location.table_id); + let key_values = direct_read_logs_from_table(&table_kv, table, location.table_id).await; // Logs from min sequence to seq1 should be deleted from the table. let mut expect_seq = seq1 + 1; @@ -1689,7 +1696,7 @@ mod tests { async fn write_test_payloads( namespace: &Namespace, - region_id: RegionId, + location: Location, start_sequence: u32, end_sequence: u32, ) -> SequenceNumber { @@ -1700,7 +1707,7 @@ mod tests { } let log_entries = (start_sequence..end_sequence).collect::>(); - let wal_encoder = LogBatchEncoder::create(region_id); + let wal_encoder = LogBatchEncoder::create(location); let log_batch = wal_encoder .encode_batch::(&log_entries) .expect("should succeed to encode payload batch"); diff --git a/wal/src/table_kv_impl/region.rs b/wal/src/table_kv_impl/region.rs index 51c3e6d35f..8120a93f73 100644 --- a/wal/src/table_kv_impl/region.rs +++ b/wal/src/table_kv_impl/region.rs @@ -799,7 +799,7 @@ impl RegionWriter { debug!( "Wal region begin writing, ctx:{:?}, region_id:{}, log_entries_num:{}", ctx, - log_batch.region_id, + log_batch.location.table_id, log_batch.entries.len() ); @@ -812,7 +812,10 @@ impl RegionWriter { for entry in &log_batch.entries { log_encoding - .encode_key(&mut key_buf, &(log_batch.region_id, next_sequence_num)) + .encode_key( + &mut key_buf, + &(log_batch.location.table_id, next_sequence_num), + ) .context(LogCodec)?; wb.insert(&key_buf, &entry.payload); @@ -824,7 +827,7 @@ impl RegionWriter { let table_kv = table_kv.clone(); let bucket = bucket.clone(); - let region_id = log_batch.region_id; + let region_id = log_batch.location.table_id; runtime .spawn_blocking(move || { let table_name = bucket.wal_shard_table(region_id); diff --git a/wal/src/table_kv_impl/wal.rs b/wal/src/table_kv_impl/wal.rs index 501bb519fc..f5868c0b22 100644 --- a/wal/src/table_kv_impl/wal.rs +++ b/wal/src/table_kv_impl/wal.rs @@ -5,7 +5,7 @@ use std::{fmt, str, sync::Arc}; use async_trait::async_trait; -use common_types::SequenceNumber; +use common_types::{table::Location, SequenceNumber}; use log::info; use snafu::ResultExt; use table_kv::TableKv; @@ -13,7 +13,8 @@ use table_kv::TableKv; use crate::{ log_batch::LogWriteBatch, manager::{ - self, error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, RegionId, WalManager, + self, error::*, BatchLogIteratorAdapter, ReadContext, ReadRequest, ScanContext, + ScanRequest, WalManager, }, table_kv_impl::{ model::NamespaceConfig, @@ -98,9 +99,9 @@ impl fmt::Debug for WalNamespaceImpl { #[async_trait] impl WalManager for WalNamespaceImpl { - async fn sequence_num(&self, region_id: RegionId) -> Result { + async fn sequence_num(&self, location: Location) -> Result { self.namespace - .last_sequence(region_id) + .last_sequence(location.table_id) .await .map_err(|e| Box::new(e) as _) .context(Read) @@ -108,11 +109,11 @@ impl WalManager for WalNamespaceImpl { async fn mark_delete_entries_up_to( &self, - region_id: RegionId, + location: Location, sequence_num: SequenceNumber, ) -> Result<()> { self.namespace - .delete_entries(region_id, sequence_num) + .delete_entries(location.table_id, sequence_num) .await .map_err(|e| Box::new(e) as _) .context(Delete) @@ -158,4 +159,12 @@ impl WalManager for WalNamespaceImpl { .map_err(|e| Box::new(e) as _) .context(Write) } + + async fn scan( + &self, + _ctx: &ScanContext, + _req: &ScanRequest, + ) -> Result { + todo!() + } } diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index 8e90f61403..cb8f02bdca 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -2,7 +2,10 @@ use std::{ops::Deref, sync::Arc}; -use common_types::SequenceNumber; +use common_types::{ + table::{Location, DEFAULT_SHARD_ID}, + SequenceNumber, +}; use crate::{ manager::{ReadBoundary, ReadRequest, RegionId, WalManagerRef}, @@ -29,12 +32,12 @@ async fn check_write_batch_with_read_request( async fn check_write_batch( env: &TestEnv, wal: WalManagerRef, - region_id: RegionId, + location: Location, max_seq: SequenceNumber, payload_batch: &[TestPayload], ) { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Included(max_seq + 1 - payload_batch.len() as u64), end: ReadBoundary::Included(max_seq), }; @@ -44,26 +47,26 @@ async fn check_write_batch( async fn simple_read_write_with_wal( env: impl Deref>, wal: WalManagerRef, - region_id: RegionId, + location: Location, ) { - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), location, 0, 10).await; let seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - check_write_batch(&env, wal, region_id, seq, &payload_batch).await + check_write_batch(&env, wal, location, seq, &payload_batch).await } -async fn simple_read_write(env: &TestEnv, region_id: RegionId) { +async fn simple_read_write(env: &TestEnv, location: Location) { let wal = env.build_wal().await; // Empty region has 0 sequence num. - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal.sequence_num(location).await.unwrap(); assert_eq!(0, last_seq); - simple_read_write_with_wal(env, wal.clone(), region_id).await; + simple_read_write_with_wal(env, wal.clone(), location).await; - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal.sequence_num(location).await.unwrap(); assert_eq!(10, last_seq); wal.close_gracefully().await.unwrap(); @@ -72,14 +75,14 @@ async fn simple_read_write(env: &TestEnv, region_id: RegionId) /// Test the read with different kinds of boundaries. async fn read_with_boundary(env: &TestEnv) { let wal = env.build_wal().await; - let region_id = 0; - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let location = Location::new(0, 0); + let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), location, 0, 10).await; let end_seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal.sequence_num(location).await.unwrap(); assert_eq!(end_seq, last_seq); let start_seq = end_seq + 1 - write_batch.entries.len() as u64; @@ -87,7 +90,7 @@ async fn read_with_boundary(env: &TestEnv) { // [min, max] { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -98,7 +101,7 @@ async fn read_with_boundary(env: &TestEnv) { // [0, 10] { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Included(start_seq), end: ReadBoundary::Included(end_seq), }; @@ -109,7 +112,7 @@ async fn read_with_boundary(env: &TestEnv) { // (0, 10] { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Included(end_seq), }; @@ -121,7 +124,7 @@ async fn read_with_boundary(env: &TestEnv) { // [0, 10) { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Included(start_seq), end: ReadBoundary::Excluded(end_seq), }; @@ -139,7 +142,7 @@ async fn read_with_boundary(env: &TestEnv) { // (0, 10) { let read_req = ReadRequest { - region_id, + location, start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Excluded(end_seq), }; @@ -162,12 +165,16 @@ async fn write_multiple_regions_parallelly(env: Arc(env: Arc(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let (payload_batch, write_batch, seq) = { let wal = env.build_wal().await; - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (payload_batch, write_batch) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq = wal .write(&env.write_ctx, &write_batch) .await @@ -200,11 +214,14 @@ async fn reopen(env: &TestEnv) { // reopen the wal let wal = env.build_wal().await; - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq, last_seq); let read_req = ReadRequest { - region_id, + location: Location::new(DEFAULT_SHARD_ID, table_id), start: ReadBoundary::Included(seq + 1 - write_batch.entries.len() as u64), end: ReadBoundary::Included(seq), }; @@ -224,19 +241,29 @@ async fn reopen(env: &TestEnv) { /// - Read the part of first batch and second batch. async fn complex_read_write(env: &TestEnv) { let wal = env.build_wal().await; - let region_id = 0; + let table_id = 0; // write two batches let (start_val, mid_val, end_val) = (0, 10, 50); let (payload_batch1, write_batch_1) = env - .build_log_batch(wal.clone(), region_id, start_val, mid_val) + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + start_val, + mid_val, + ) .await; let seq_1 = wal .write(&env.write_ctx, &write_batch_1) .await .expect("should succeed to write"); let (payload_batch2, write_batch_2) = env - .build_log_batch(wal.clone(), region_id, mid_val, end_val) + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + mid_val, + end_val, + ) .await; let seq_2 = wal .write(&env.write_ctx, &write_batch_2) @@ -244,13 +271,34 @@ async fn complex_read_write(env: &TestEnv) { .expect("should succeed to write"); // read the first batch - check_write_batch(env, wal.clone(), region_id, seq_1, &payload_batch1).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq_1, + &payload_batch1, + ) + .await; // read the second batch - check_write_batch(env, wal.clone(), region_id, seq_2, &payload_batch2).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq_2, + &payload_batch2, + ) + .await; // read the whole batch let (seq_3, payload_batch3) = (seq_2, env.build_payload_batch(start_val, end_val)); - check_write_batch(env, wal.clone(), region_id, seq_3, &payload_batch3).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq_3, + &payload_batch3, + ) + .await; // read the part of batch1 and batch2 let (seq_4, payload_batch4) = { @@ -259,31 +307,55 @@ async fn complex_read_write(env: &TestEnv) { let seq = seq_2 - (end_val - new_end) as u64; (seq, env.build_payload_batch(new_start, new_end)) }; - check_write_batch(env, wal.clone(), region_id, seq_4, &payload_batch4).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq_4, + &payload_batch4, + ) + .await; wal.close_gracefully().await.unwrap(); } /// Test whether data can be deleted. async fn simple_write_delete(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let wal = env.build_wal().await; - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (payload_batch, write_batch) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - check_write_batch(env, wal.clone(), region_id, seq, &payload_batch).await; - - let last_seq = wal.sequence_num(region_id).await.unwrap(); + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq, + &payload_batch, + ) + .await; + + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq, last_seq); // delete all logs - wal.mark_delete_entries_up_to(region_id, seq) + wal.mark_delete_entries_up_to(Location::new(DEFAULT_SHARD_ID, table_id), seq) .await .expect("should succeed to delete"); let read_req = ReadRequest { - region_id, + location: Location::new(DEFAULT_SHARD_ID, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -294,7 +366,10 @@ async fn simple_write_delete(env: &TestEnv) { env.check_log_entries(seq, &[], iter).await; // Sequence num remains unchanged. - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq, last_seq); wal.close_gracefully().await.unwrap(); @@ -302,21 +377,35 @@ async fn simple_write_delete(env: &TestEnv) { /// Delete half of the written data and check the remaining half can be read. async fn write_delete_half(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let wal = env.build_wal().await; - let (mut payload_batch, write_batch) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (mut payload_batch, write_batch) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - check_write_batch(env, wal.clone(), region_id, seq, &payload_batch).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + seq, + &payload_batch, + ) + .await; // delete all logs - wal.mark_delete_entries_up_to(region_id, seq / 2) + wal.mark_delete_entries_up_to(Location::new(DEFAULT_SHARD_ID, table_id), seq / 2) .await .expect("should succeed to delete"); let read_req = ReadRequest { - region_id, + location: Location::new(DEFAULT_SHARD_ID, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -329,7 +418,10 @@ async fn write_delete_half(env: &TestEnv) { env.check_log_entries(seq, &payload_batch, iter).await; // Sequence num remains unchanged. - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq, last_seq); wal.close_gracefully().await.unwrap(); @@ -337,27 +429,40 @@ async fn write_delete_half(env: &TestEnv) { /// Test delete across multiple regions. async fn write_delete_multiple_regions(env: &TestEnv) { - let (region_id_1, region_id_2) = (1, 2); + let (table_id_1, table_id_2) = (1, 2); let wal = env.build_wal().await; - let (_, write_batch_1) = env.build_log_batch(wal.clone(), region_id_1, 0, 10).await; + let (_, write_batch_1) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id_1), + 0, + 10, + ) + .await; let seq_1 = wal .write(&env.write_ctx, &write_batch_1) .await .expect("should succeed to write"); - let (payload_batch2, write_batch_2) = - env.build_log_batch(wal.clone(), region_id_2, 10, 20).await; + let (payload_batch2, write_batch_2) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id_2), + 10, + 20, + ) + .await; let seq_2 = wal .write(&env.write_ctx, &write_batch_2) .await .expect("should succeed to write"); // delete all logs of region 1. - wal.mark_delete_entries_up_to(region_id_1, seq_1) + wal.mark_delete_entries_up_to(Location::new(DEFAULT_SHARD_ID, table_id_1), seq_1) .await .expect("should succeed to delete"); let read_req = ReadRequest { - region_id: region_id_1, + location: Location::new(DEFAULT_SHARD_ID, table_id_1), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -367,26 +472,54 @@ async fn write_delete_multiple_regions(env: &TestEnv) { .expect("should succeed to read"); env.check_log_entries(seq_1, &[], iter).await; - check_write_batch(env, wal.clone(), region_id_2, seq_2, &payload_batch2).await; + check_write_batch( + env, + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id_2), + seq_2, + &payload_batch2, + ) + .await; wal.close_gracefully().await.unwrap(); } /// The sequence number should increase monotonically after multiple writes. async fn sequence_increase_monotonically_multiple_writes(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let wal = env.build_wal().await; - let (_, write_batch1) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch1) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq_1 = wal .write(&env.write_ctx, &write_batch1) .await .expect("should succeed to write"); - let (_, write_batch2) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch2) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq_2 = wal .write(&env.write_ctx, &write_batch2) .await .expect("should succeed to write"); - let (_, write_batch3) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch3) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq_3 = wal .write(&env.write_ctx, &write_batch3) @@ -402,26 +535,43 @@ async fn sequence_increase_monotonically_multiple_writes(env: &Te /// The sequence number should increase monotonically after write, delete and /// one more write. async fn sequence_increase_monotonically_delete_write(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let wal = env.build_wal().await; - let (_, write_batch1) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch1) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; // write let seq_1 = wal .write(&env.write_ctx, &write_batch1) .await .expect("should succeed to write"); // delete - wal.mark_delete_entries_up_to(region_id, seq_1) + wal.mark_delete_entries_up_to(Location::new(DEFAULT_SHARD_ID, table_id), seq_1) .await .expect("should succeed to delete"); - let (_, write_batch2) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch2) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; // write again let seq_2 = wal .write(&env.write_ctx, &write_batch2) .await .expect("should succeed to write"); - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq_2, last_seq); assert!(seq_2 > seq_1); @@ -432,16 +582,23 @@ async fn sequence_increase_monotonically_delete_write(env: &TestE /// The sequence number should increase monotonically after write, delete, /// reopen and write. async fn sequence_increase_monotonically_delete_reopen_write(env: &TestEnv) { - let region_id = 0; + let table_id = 0; let wal = env.build_wal().await; - let (_, write_batch1) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch1) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; // write let seq_1 = wal .write(&env.write_ctx, &write_batch1) .await .expect("should succeed to write"); // delete - wal.mark_delete_entries_up_to(region_id, seq_1) + wal.mark_delete_entries_up_to(Location::new(DEFAULT_SHARD_ID, table_id), seq_1) .await .expect("should succeed to delete"); @@ -451,13 +608,23 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: let wal = env.build_wal().await; // write again - let (_, write_batch2) = env.build_log_batch(wal.clone(), region_id, 0, 10).await; + let (_, write_batch2) = env + .build_log_batch( + wal.clone(), + Location::new(DEFAULT_SHARD_ID, table_id), + 0, + 10, + ) + .await; let seq_2 = wal .write(&env.write_ctx, &write_batch2) .await .expect("should succeed to write"); - let last_seq = wal.sequence_num(region_id).await.unwrap(); + let last_seq = wal + .sequence_num(Location::new(DEFAULT_SHARD_ID, table_id)) + .await + .unwrap(); assert_eq!(seq_2, last_seq); assert!(seq_2 > seq_1); @@ -467,32 +634,52 @@ async fn sequence_increase_monotonically_delete_reopen_write(env: #[test] fn test_simple_read_write_default_batch() { + let table_id = 0; let env = RocksTestEnv::new(2, RocksWalBuilder::default()); - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); let env = TableKvTestEnv::new(2, MemoryTableWalBuilder::default()); - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); let env = TableKvTestEnv::new(2, MemoryTableWalBuilder::with_ttl("1d")); - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); } #[test] fn test_simple_read_write_different_batch_size() { + let table_id = 0; let batch_sizes = [1, 2, 4, 10, 100]; for batch_size in batch_sizes { let mut env = RocksTestEnv::new(2, RocksWalBuilder::default()); env.read_ctx.batch_size = batch_size; - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); let mut env = TableKvTestEnv::new(2, MemoryTableWalBuilder::default()); env.read_ctx.batch_size = batch_size; - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); let mut env = TableKvTestEnv::new(2, MemoryTableWalBuilder::with_ttl("1d")); env.read_ctx.batch_size = batch_size; - env.runtime.block_on(simple_read_write(&env, 0)); + env.runtime.block_on(simple_read_write( + &env, + Location::new(DEFAULT_SHARD_ID, table_id), + )); } } diff --git a/wal/src/tests/util.rs b/wal/src/tests/util.rs index bdf3f2c70a..c5007cdeb1 100644 --- a/wal/src/tests/util.rs +++ b/wal/src/tests/util.rs @@ -7,6 +7,7 @@ use std::{collections::VecDeque, path::Path, str::FromStr, sync::Arc}; use async_trait::async_trait; use common_types::{ bytes::{MemBuf, MemBufMut}, + table::Location, SequenceNumber, }; use common_util::{ @@ -20,8 +21,8 @@ use tempfile::TempDir; use crate::{ log_batch::{LogWriteBatch, Payload, PayloadDecoder}, manager::{ - BatchLogIterator, BatchLogIteratorAdapter, ReadContext, RegionId, WalManager, - WalManagerRef, WriteContext, + BatchLogIterator, BatchLogIteratorAdapter, ReadContext, WalManager, WalManagerRef, + WriteContext, }, rocks_impl::{self, manager::RocksImpl}, table_kv_impl::{model::NamespaceConfig, wal::WalNamespaceImpl, WalRuntimes}, @@ -144,14 +145,14 @@ impl TestEnv { pub async fn build_log_batch( &self, wal: WalManagerRef, - region_id: RegionId, + location: Location, start: u32, end: u32, ) -> (Vec, LogWriteBatch) { let log_entries = (start..end).collect::>(); let log_batch_encoder = wal - .encoder(region_id) + .encoder(location) .expect("should succeed to create log batch encoder"); let log_batch = log_batch_encoder