diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 709c333192..06d60a6a0c 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -37,7 +37,6 @@ use crate::{ PickerManager, TableCompactionRequest, WaitError, WaiterNotifier, }, instance::SpaceStore, - meta::Manifest, table::data::TableDataRef, TableOptions, }; @@ -219,8 +218,8 @@ pub struct SchedulerImpl { } impl SchedulerImpl { - pub fn new( - space_store: Arc>, + pub fn new( + space_store: Arc, runtime: Arc, config: SchedulerConfig, ) -> Self { @@ -293,10 +292,10 @@ impl OngoingTask { } } -struct ScheduleWorker { +struct ScheduleWorker { sender: Sender, receiver: Receiver, - space_store: Arc>, + space_store: Arc, runtime: Arc, schedule_interval: Duration, picker_manager: PickerManager, @@ -314,7 +313,7 @@ async fn schedule_table_compaction(sender: Sender, request: TableC } } -impl ScheduleWorker { +impl ScheduleWorker { async fn schedule_loop(&mut self) { while self.running.load(Ordering::Relaxed) { // TODO(yingwen): Maybe add a random offset to the interval. diff --git a/analytic_engine/src/engine.rs b/analytic_engine/src/engine.rs index b3f05b02df..53e4338c8f 100644 --- a/analytic_engine/src/engine.rs +++ b/analytic_engine/src/engine.rs @@ -15,24 +15,16 @@ use table_engine::{ table::{SchemaId, TableRef}, ANALYTIC_ENGINE_TYPE, }; -use wal::{manager::WalManager, rocks_impl::manager::RocksImpl}; - -use crate::{ - context::CommonContext, - instance::InstanceRef, - meta::{details::ManifestImpl, Manifest}, - space::SpaceId, - table::TableImpl, - ObkvWal, -}; + +use crate::{context::CommonContext, instance::InstanceRef, space::SpaceId, table::TableImpl}; /// TableEngine implementation -pub struct TableEngineImpl { +pub struct TableEngineImpl { /// Instance of the table engine - instance: InstanceRef, + instance: InstanceRef, } -impl Clone for TableEngineImpl { +impl Clone for TableEngineImpl { fn clone(&self) -> Self { Self { instance: self.instance.clone(), @@ -40,24 +32,20 @@ impl Clone for TableEngineImpl { } } -impl - TableEngineImpl -{ - pub fn new(instance: InstanceRef) -> Self { +impl TableEngineImpl { + pub fn new(instance: InstanceRef) -> Self { Self { instance } } } -impl Drop for TableEngineImpl { +impl Drop for TableEngineImpl { fn drop(&mut self) { info!("Table engine dropped"); } } #[async_trait] -impl TableEngine - for TableEngineImpl -{ +impl TableEngine for TableEngineImpl { fn engine_type(&self) -> &str { ANALYTIC_ENGINE_TYPE } @@ -159,11 +147,6 @@ impl>; -/// Reference to instance replicating data by obkv wal. -pub(crate) type ReplicatedInstanceRef = InstanceRef>; - /// Generate the space id from the schema id with assumption schema id is unique /// globally. #[inline] diff --git a/analytic_engine/src/instance/alter.rs b/analytic_engine/src/instance/alter.rs index ddf6f9ee71..ccda3621dc 100644 --- a/analytic_engine/src/instance/alter.rs +++ b/analytic_engine/src/instance/alter.rs @@ -8,7 +8,6 @@ use log::info; use snafu::{ensure, ResultExt}; use table_engine::table::AlterSchemaRequest; use tokio::sync::oneshot; -use wal::manager::WalManager; use crate::{ instance::{ @@ -21,20 +20,13 @@ use crate::{ write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal}, Instance, }, - meta::{ - meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate}, - Manifest, - }, + meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate}, space::SpaceAndTable, table::data::TableDataRef, table_options, }; -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { // Alter schema need to be handled by write worker. pub async fn alter_schema_of_table( &self, @@ -109,7 +101,6 @@ where .manifest .store_update(meta_update) .await - .map_err(|e| Box::new(e) as _) .context(WriteManifest { space_id: space_table.space().id, table: &table_data.name, @@ -242,7 +233,6 @@ where .manifest .store_update(meta_update) .await - .map_err(|e| Box::new(e) as _) .context(WriteManifest { space_id: space_table.space().id, table: &table_data.name, diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index 99369f753e..e0b6afe49e 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -8,7 +8,6 @@ use log::{info, warn}; use snafu::ResultExt; use table_engine::engine::CloseTableRequest; use tokio::sync::oneshot; -use wal::manager::WalManager; use crate::{ instance::{ @@ -17,15 +16,10 @@ use crate::{ write_worker::{self, CloseTableCommand, WorkerLocal}, Instance, }, - meta::Manifest, space::SpaceRef, }; -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Close table need to be handled by write worker. pub async fn do_close_table(&self, space: SpaceRef, request: CloseTableRequest) -> Result<()> { info!("Instance close table, request:{:?}", request); diff --git a/analytic_engine/src/instance/create.rs b/analytic_engine/src/instance/create.rs index ba3ca95a8a..c4d6c23ee2 100644 --- a/analytic_engine/src/instance/create.rs +++ b/analytic_engine/src/instance/create.rs @@ -8,7 +8,6 @@ use log::info; use snafu::ResultExt; use table_engine::engine::CreateTableRequest; use tokio::sync::oneshot; -use wal::manager::WalManager; use crate::{ instance::{ @@ -16,20 +15,13 @@ use crate::{ write_worker::{self, CreateTableCommand, WorkerLocal}, Instance, }, - meta::{ - meta_update::{AddTableMeta, MetaUpdate}, - Manifest, - }, + meta::meta_update::{AddTableMeta, MetaUpdate}, space::SpaceRef, table::data::{TableData, TableDataRef}, table_options, }; -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Create table need to be handled by write worker. pub async fn do_create_table( &self, @@ -114,7 +106,6 @@ where .manifest .store_update(update) .await - .map_err(|e| Box::new(e) as _) .context(WriteManifest { space_id: space.id, table: &table_data.name, diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index 13c12c11eb..956c8fc49b 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -8,7 +8,6 @@ use log::{info, warn}; use snafu::ResultExt; use table_engine::engine::DropTableRequest; use tokio::sync::oneshot; -use wal::manager::WalManager; use crate::{ instance::{ @@ -17,18 +16,11 @@ use crate::{ write_worker::{self, DropTableCommand, WorkerLocal}, Instance, }, - meta::{ - meta_update::{DropTableMeta, MetaUpdate}, - Manifest, - }, + meta::meta_update::{DropTableMeta, MetaUpdate}, space::SpaceRef, }; -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Drop a table under given space pub async fn do_drop_table( self: &Arc, @@ -109,7 +101,6 @@ where .manifest .store_update(update) .await - .map_err(|e| Box::new(e) as _) .context(WriteManifest { space_id: space.id, table: &table_data.name, diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 490f6fcaad..495bc312d2 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -11,12 +11,10 @@ use table_engine::{ engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}, table::TableId, }; -use wal::manager::WalManager; use crate::{ context::CommonContext, instance::{write_worker::WriteGroup, Instance}, - meta::Manifest, space::{Space, SpaceAndTable, SpaceId, SpaceRef}, }; @@ -205,11 +203,7 @@ impl From for table_engine::engine::Error { } } -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Find space by name, create if the space is not exists pub async fn find_or_create_space( self: &Arc, diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index b6dd34525b..be016a9d91 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -22,7 +22,7 @@ use log::{error, info}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{predicate::Predicate, table::Result as TableResult}; use tokio::sync::oneshot; -use wal::manager::{RegionId, WalManager}; +use wal::manager::RegionId; use crate::{ compaction::{ @@ -33,10 +33,7 @@ use crate::{ Instance, SpaceStore, }, memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest}, - meta::{ - meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta}, - Manifest, - }, + meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta}, row_iter::{ self, dedup::DedupIterator, @@ -181,11 +178,7 @@ pub enum TableFlushPolicy { Purge, } -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Flush this table. pub async fn flush_table( &self, @@ -291,7 +284,6 @@ where .manifest .store_update(meta_update) .await - .map_err(|e| Box::new(e) as _) .context(StoreVersionEdit)?; table_data.set_table_options(worker_local, new_table_opts); @@ -530,7 +522,6 @@ where .manifest .store_update(meta_update) .await - .map_err(|e| Box::new(e) as _) .context(StoreVersionEdit)?; // Edit table version to remove dumped memtables. @@ -765,7 +756,7 @@ where } } -impl SpaceStore { +impl SpaceStore { pub(crate) async fn compact_table( &self, runtime: Arc, @@ -806,7 +797,6 @@ impl SpaceStore { self.manifest .store_update(meta_update) .await - .map_err(|e| Box::new(e) as _) .context(StoreVersionEdit)?; // Apply to the table version. diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 6990fbb445..ac35091e83 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -29,11 +29,11 @@ use object_store::ObjectStoreRef; use parquet::{DataCacheRef, MetaCacheRef}; use snafu::{ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; -use wal::manager::WalManager; +use wal::manager::WalManagerRef; use crate::{ compaction::scheduler::CompactionSchedulerRef, - meta::Manifest, + meta::ManifestRef, space::{SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::TableDataRef, @@ -85,13 +85,13 @@ impl Spaces { } } -pub struct SpaceStore { +pub struct SpaceStore { /// All spaces of the engine. spaces: RwLock, /// Manifest (or meta) stores meta data of the engine instance. - manifest: Meta, + manifest: ManifestRef, /// Wal of all tables - wal_manager: Wal, + wal_manager: WalManagerRef, /// Sst storage. store: ObjectStoreRef, /// Sst factory. @@ -101,13 +101,13 @@ pub struct SpaceStore { data_cache: Option, } -impl Drop for SpaceStore { +impl Drop for SpaceStore { fn drop(&mut self) { info!("SpaceStore dropped"); } } -impl SpaceStore { +impl SpaceStore { async fn close(&self) -> Result<()> { let spaces = self.spaces.read().unwrap().list_all_spaces(); for space in spaces { @@ -119,7 +119,7 @@ impl SpaceStore { } } -impl SpaceStore { +impl SpaceStore { fn store_ref(&self) -> &ObjectStoreRef { &self.store } @@ -142,9 +142,9 @@ impl SpaceStore { /// /// Manages all spaces, also contains needed resources shared across all table // TODO(yingwen): Track memory usage of all tables (or tables of space) -pub struct Instance { +pub struct Instance { /// Space storage - space_store: Arc>, + space_store: Arc, /// Runtime to execute async tasks. runtimes: Arc, /// Global table options, overwrite mutable options in each table's @@ -170,7 +170,7 @@ pub struct Instance { pub(crate) replay_batch_size: usize, } -impl Instance { +impl Instance { /// Close the instance gracefully. pub async fn close(&self) -> Result<()> { self.file_purger.stop().await.context(StopFilePurger)?; @@ -185,7 +185,7 @@ impl Instance { } // TODO(yingwen): Instance builder -impl Instance { +impl Instance { /// Find space using read lock fn get_space_by_read_lock(&self, space: SpaceId) -> Option { let spaces = self.space_store.spaces.read().unwrap(); @@ -222,4 +222,4 @@ impl Instance { } /// Instance reference -pub type InstanceRef = Arc>; +pub type InstanceRef = Arc; diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index f9d15420f2..c5af139221 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -15,7 +15,7 @@ use table_engine::table::TableId; use tokio::sync::oneshot; use wal::{ log_batch::LogEntry, - manager::{BatchLogIterator, ReadBoundary, ReadContext, ReadRequest, WalManager}, + manager::{BatchLogIterator, ReadBoundary, ReadContext, ReadRequest, WalManagerRef}, }; use crate::{ @@ -32,23 +32,19 @@ use crate::{ write_worker::{RecoverTableCommand, WorkerLocal, WriteGroup}, Instance, SpaceStore, Spaces, }, - meta::{meta_data::TableManifestData, Manifest}, + meta::{meta_data::TableManifestData, ManifestRef}, payload::{ReadPayload, WalDecoder}, space::{Space, SpaceId, SpaceRef}, sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger}, table::data::{TableData, TableDataRef}, }; -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Open a new instance pub async fn open( ctx: OpenContext, - manifest: Meta, - wal_manager: Wal, + manifest: ManifestRef, + wal_manager: WalManagerRef, store: ObjectStoreRef, sst_factory: SstFactoryRef, ) -> Result> { @@ -203,7 +199,6 @@ where .manifest .load_data(table_id, true) .await - .map_err(|e| Box::new(e) as _) .context(ReadMetaUpdate { table_id })?; let table_data = if let Some(manifest_data) = manifest_data { diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index eb8a9ec34e..bfedd4b3ae 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -23,11 +23,9 @@ use table_engine::{ table::ReadRequest, }; use tokio::sync::mpsc::{self, Receiver}; -use wal::manager::WalManager; use crate::{ instance::Instance, - meta::Manifest, row_iter::{ chain, chain::{ChainConfig, ChainIterator}, @@ -75,7 +73,7 @@ fn need_merge_sort_streams(table_options: &TableOptions, read_request: &ReadRequ table_options.need_dedup() || read_request.order.is_in_order() } -impl Instance { +impl Instance { /// Read data in multiple time range from table, and return /// `read_parallelism` output streams. pub async fn partitioned_read_from_table( diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index d1e40f2566..400b81cb33 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -18,7 +18,7 @@ use table_engine::table::WriteRequest; use tokio::sync::oneshot; use wal::{ log_batch::{LogWriteBatch, LogWriteEntry}, - manager::{SequenceNumber, WalManager, WriteContext}, + manager::{SequenceNumber, WriteContext}, }; use crate::{ @@ -29,7 +29,6 @@ use crate::{ Instance, }, memtable::{key::KeySequence, PutContext}, - meta::Manifest, payload::WritePayload, space::SpaceAndTable, table::{ @@ -140,11 +139,7 @@ impl EncodeContext { } } -impl Instance -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +impl Instance { /// Write data to the table under give space. pub async fn write_to_table( &self, diff --git a/analytic_engine/src/instance/write_worker.rs b/analytic_engine/src/instance/write_worker.rs index e8cb20ea5c..627d3da025 100644 --- a/analytic_engine/src/instance/write_worker.rs +++ b/analytic_engine/src/instance/write_worker.rs @@ -27,7 +27,6 @@ use table_engine::{ }, }; use tokio::sync::{mpsc, oneshot, watch, watch::Ref, Mutex, Notify}; -use wal::manager::WalManager; use crate::{ compaction::{TableCompactionRequest, WaitResult}, @@ -36,7 +35,6 @@ use crate::{ flush_compaction::{self, TableFlushOptions}, write, write_worker, InstanceRef, }, - meta::Manifest, space::{SpaceAndTable, SpaceId, SpaceRef}, table::{data::TableDataRef, metrics::Metrics}, }; @@ -555,11 +553,7 @@ pub struct WriteGroup { } impl WriteGroup { - pub fn new(opts: Options, instance: InstanceRef) -> Self - where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, - { + pub fn new(opts: Options, instance: InstanceRef) -> Self { let mut worker_datas = Vec::with_capacity(opts.worker_num); let mut handles = Vec::with_capacity(opts.worker_num); for id in 0..opts.worker_num { @@ -691,18 +685,16 @@ impl WorkerSharedData { /// /// The write worker should ensure there is only one flush thread (task) is /// running. -struct WriteWorker { +struct WriteWorker { /// Command receiver rx: mpsc::Receiver, /// Engine instance - instance: InstanceRef, + instance: InstanceRef, /// Worker local states local: WorkerLocal, } -impl - WriteWorker -{ +impl WriteWorker { /// Runs the write loop until stopped async fn run(&mut self) { // TODO(yingwen): Maybe batch write tasks to improve performance (group commit) diff --git a/analytic_engine/src/lib.rs b/analytic_engine/src/lib.rs index 4230a8f730..fb220019e7 100644 --- a/analytic_engine/src/lib.rs +++ b/analytic_engine/src/lib.rs @@ -24,14 +24,11 @@ pub mod tests; use meta::details::Options as ManifestOptions; use serde_derive::Deserialize; use storage_options::{LocalOptions, StorageOptions}; -use table_kv::{config::ObkvConfig, obkv::ObkvImpl}; -use wal::table_kv_impl::{model::NamespaceConfig, wal::WalNamespaceImpl}; +use table_kv::config::ObkvConfig; +use wal::table_kv_impl::model::NamespaceConfig; pub use crate::{compaction::scheduler::SchedulerConfig, table_options::TableOptions}; -/// Wal based on obkv. -pub(crate) type ObkvWal = WalNamespaceImpl; - /// Config of analytic engine. #[derive(Debug, Clone, Deserialize)] #[serde(default)] diff --git a/analytic_engine/src/meta/details.rs b/analytic_engine/src/meta/details.rs index 04fac30fca..ff03a177ec 100644 --- a/analytic_engine/src/meta/details.rs +++ b/analytic_engine/src/meta/details.rs @@ -22,7 +22,7 @@ use wal::{ log_batch::{LogEntry, LogWriteBatch, LogWriteEntry}, manager::{ BatchLogIterator, BatchLogIteratorAdapter, ReadBoundary, ReadContext, ReadRequest, - RegionId, SequenceNumber, WalManager, WriteContext, + RegionId, SequenceNumber, WalManagerRef, WriteContext, }, }; @@ -140,9 +140,9 @@ impl Default for Options { /// [`Snapshotter`]). /// TODO(xikai): it may be better to store the snapshot on object store. #[derive(Debug)] -pub struct ManifestImpl { +pub struct ManifestImpl { opts: Options, - wal_manager: Arc, + wal_manager: WalManagerRef, /// Number of updates wrote to wal since last snapshot. num_updates_since_snapshot: Arc, @@ -154,11 +154,11 @@ pub struct ManifestImpl { snapshot_write_guard: Arc>, } -impl ManifestImpl { - pub async fn open(wal_manager: W, opts: Options) -> Result { +impl ManifestImpl { + pub async fn open(wal_manager: WalManagerRef, opts: Options) -> Result { let manifest = Self { opts, - wal_manager: Arc::new(wal_manager), + wal_manager, num_updates_since_snapshot: Arc::new(AtomicUsize::new(0)), snapshot_write_guard: Arc::new(Mutex::new(())), }; @@ -220,10 +220,11 @@ impl ManifestImpl { } #[async_trait] -impl Manifest for ManifestImpl { - type Error = Error; - - async fn store_update(&self, update: MetaUpdate) -> Result<()> { +impl Manifest for ManifestImpl { + async fn store_update( + &self, + update: MetaUpdate, + ) -> std::result::Result<(), Box> { let table_id = update.table_id(); self.store_update_to_wal(update).await?; @@ -243,7 +244,8 @@ impl Manifest for ManifestImpl { &self, table_id: TableId, do_snapshot: bool, - ) -> Result> { + ) -> std::result::Result, Box> + { let region_id = table_id.as_u64(); if do_snapshot { if let Some(snapshot) = self.maybe_do_snapshot(table_id).await? { @@ -271,13 +273,13 @@ trait MetaUpdateLogStore: std::fmt::Debug { } #[derive(Debug, Clone)] -struct RegionWal { +struct RegionWal { region_id: RegionId, - wal_manager: Arc, + wal_manager: WalManagerRef, } -impl RegionWal { - fn new(region_id: RegionId, wal_manager: Arc) -> Self { +impl RegionWal { + fn new(region_id: RegionId, wal_manager: WalManagerRef) -> Self { Self { region_id, wal_manager, @@ -286,7 +288,7 @@ impl RegionWal { } #[async_trait] -impl MetaUpdateLogStore for RegionWal { +impl MetaUpdateLogStore for RegionWal { type Iter = MetaUpdateReaderImpl; async fn scan(&self, start: ReadBoundary, end: ReadBoundary) -> Result { @@ -649,7 +651,7 @@ mod tests { use common_util::{runtime, runtime::Runtime, tests::init_log_for_test}; use futures::future::BoxFuture; use table_engine::table::{SchemaId, TableId, TableSeqGenerator}; - use wal::rocks_impl::manager::{Builder as WalBuilder, RocksImpl}; + use wal::rocks_impl::manager::Builder as WalBuilder; use super::*; use crate::{ @@ -734,13 +736,13 @@ mod tests { format!("table_{:?}", table_id) } - async fn open_manifest(&self) -> ManifestImpl { + async fn open_manifest(&self) -> ManifestImpl { let manifest_wal = WalBuilder::with_default_rocksdb_config(self.dir.clone(), self.runtime.clone()) .build() .unwrap(); - ManifestImpl::open(manifest_wal, self.options.clone()) + ManifestImpl::open(Arc::new(manifest_wal), self.options.clone()) .await .unwrap() } @@ -749,7 +751,7 @@ mod tests { &self, table_id: TableId, expected: &Option, - manifest: &ManifestImpl, + manifest: &ManifestImpl, ) { let data = manifest.load_data(table_id, false).await.unwrap(); assert_eq!(&data, expected); @@ -823,7 +825,7 @@ mod tests { &self, table_id: TableId, manifest_data_builder: &mut TableManifestDataBuilder, - manifest: &ManifestImpl, + manifest: &ManifestImpl, ) { let add_table = self.meta_update_add_table(table_id); manifest.store_update(add_table.clone()).await.unwrap(); @@ -834,7 +836,7 @@ mod tests { &self, table_id: TableId, manifest_data_builder: &mut TableManifestDataBuilder, - manifest: &ManifestImpl, + manifest: &ManifestImpl, ) { let drop_table = self.meta_update_drop_table(table_id); manifest.store_update(drop_table.clone()).await.unwrap(); @@ -846,7 +848,7 @@ mod tests { table_id: TableId, flushed_seq: Option, manifest_data_builder: &mut TableManifestDataBuilder, - manifest: &ManifestImpl, + manifest: &ManifestImpl, ) { let version_edit = self.meta_update_version_edit(table_id, flushed_seq); manifest.store_update(version_edit.clone()).await.unwrap(); diff --git a/analytic_engine/src/meta/mod.rs b/analytic_engine/src/meta/mod.rs index 6edb8981db..1af1a84bb0 100644 --- a/analytic_engine/src/meta/mod.rs +++ b/analytic_engine/src/meta/mod.rs @@ -6,7 +6,7 @@ pub mod details; pub mod meta_data; pub mod meta_update; -use std::fmt; +use std::{fmt, sync::Arc}; use async_trait::async_trait; use meta_update::MetaUpdate; @@ -16,11 +16,12 @@ use crate::meta::meta_data::TableManifestData; /// Manifest holds meta data of all tables. #[async_trait] -pub trait Manifest: fmt::Debug { - type Error: std::error::Error + Send + Sync + 'static; - +pub trait Manifest: Send + Sync + fmt::Debug { /// Store update to manifest - async fn store_update(&self, update: MetaUpdate) -> Result<(), Self::Error>; + async fn store_update( + &self, + update: MetaUpdate, + ) -> std::result::Result<(), Box>; /// Load table meta data from manifest. /// @@ -30,5 +31,7 @@ pub trait Manifest: fmt::Debug { &self, table_id: TableId, do_snapshot: bool, - ) -> Result, Self::Error>; + ) -> Result, Box>; } + +pub type ManifestRef = Arc; diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 4670c1fe28..2f49a641ac 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -16,16 +16,16 @@ use snafu::{ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; use table_kv::{memory::MemoryImpl, obkv::ObkvImpl, TableKv}; use wal::{ - manager::{self, WalManager}, + manager::{self, WalManagerRef}, rocks_impl::manager::Builder as WalBuilder, table_kv_impl::{wal::WalNamespaceImpl, WalRuntimes}, }; use crate::{ context::OpenContext, - engine::{ReplicatedInstanceRef, RocksInstanceRef, TableEngineImpl}, + engine::TableEngineImpl, instance::{Instance, InstanceRef}, - meta::{details::ManifestImpl, Manifest}, + meta::{details::ManifestImpl, ManifestRef}, sst::factory::FactoryImpl, storage_options::StorageOptions, Config, @@ -71,17 +71,28 @@ const WAL_DIR_NAME: &str = "wal"; const MANIFEST_DIR_NAME: &str = "manifest"; const STORE_DIR_NAME: &str = "store"; -type InstanceRefOnTableKv = InstanceRef, ManifestImpl>>; - /// Analytic engine builder. #[async_trait] -pub trait EngineBuilder: Default { +pub trait EngineBuilder: Send + Sync + Default { /// Build the analytic engine from `config` and `engine_runtimes`. async fn build( &self, config: Config, engine_runtimes: Arc, - ) -> Result; + ) -> Result { + let (wal, manifest) = self + .open_wal_and_manifest(config.clone(), engine_runtimes.clone()) + .await?; + let store = open_storage(config.storage.clone()).await?; + let instance = open_instance(config.clone(), engine_runtimes, wal, manifest, store).await?; + Ok(Arc::new(TableEngineImpl::new(instance))) + } + + async fn open_wal_and_manifest( + &self, + config: Config, + engine_runtimes: Arc, + ) -> Result<(WalManagerRef, ManifestRef)>; } /// [RocksEngine] builder. @@ -90,16 +101,30 @@ pub struct RocksEngineBuilder; #[async_trait] impl EngineBuilder for RocksEngineBuilder { - async fn build( + async fn open_wal_and_manifest( &self, config: Config, engine_runtimes: Arc, - ) -> Result { + ) -> Result<(WalManagerRef, ManifestRef)> { assert!(!config.obkv_wal.enable); - let store = open_storage(config.storage.clone()).await?; - let instance = open_rocks_instance(config, engine_runtimes, store).await?; - Ok(Arc::new(TableEngineImpl::new(instance))) + let write_runtime = engine_runtimes.write_runtime.clone(); + let data_path = Path::new(&config.wal_path); + let wal_path = data_path.join(WAL_DIR_NAME); + let wal_manager = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) + .build() + .context(OpenWal)?; + + let manifest_path = data_path.join(MANIFEST_DIR_NAME); + let manifest_wal = WalBuilder::with_default_rocksdb_config(manifest_path, write_runtime) + .build() + .context(OpenManifestWal)?; + + let manifest = ManifestImpl::open(Arc::new(manifest_wal), config.manifest.clone()) + .await + .context(OpenManifest)?; + + Ok((Arc::new(wal_manager), Arc::new(manifest))) } } @@ -109,16 +134,22 @@ pub struct ReplicatedEngineBuilder; #[async_trait] impl EngineBuilder for ReplicatedEngineBuilder { - async fn build( + async fn open_wal_and_manifest( &self, config: Config, engine_runtimes: Arc, - ) -> Result { + ) -> Result<(WalManagerRef, ManifestRef)> { assert!(config.obkv_wal.enable); - let store = open_storage(config.storage.clone()).await?; - let instance = open_replicated_instance(config, engine_runtimes, store).await?; - Ok(Arc::new(TableEngineImpl::new(instance))) + // Notice the creation of obkv client may block current thread. + let obkv_config = config.obkv_wal.obkv.clone(); + let obkv = engine_runtimes + .write_runtime + .spawn_blocking(move || ObkvImpl::new(obkv_config).context(OpenObkv)) + .await + .context(RuntimeExec)??; + + open_wal_and_manifest_with_table_kv(config, engine_runtimes, obkv).await } } @@ -133,70 +164,20 @@ pub struct MemWalEngineBuilder { #[async_trait] impl EngineBuilder for MemWalEngineBuilder { - async fn build( + async fn open_wal_and_manifest( &self, config: Config, engine_runtimes: Arc, - ) -> Result { - let store = open_storage(config.storage.clone()).await?; - let instance = - open_instance_with_table_kv(config, engine_runtimes, self.table_kv.clone(), store) - .await?; - Ok(Arc::new(TableEngineImpl::new(instance))) + ) -> Result<(WalManagerRef, ManifestRef)> { + open_wal_and_manifest_with_table_kv(config, engine_runtimes, self.table_kv.clone()).await } } -async fn open_rocks_instance( - config: Config, - engine_runtimes: Arc, - store: ObjectStoreRef, -) -> Result { - let write_runtime = engine_runtimes.write_runtime.clone(); - let data_path = Path::new(&config.wal_path); - let wal_path = data_path.join(WAL_DIR_NAME); - let wal_manager = WalBuilder::with_default_rocksdb_config(wal_path, write_runtime.clone()) - .build() - .context(OpenWal)?; - - let manifest_path = data_path.join(MANIFEST_DIR_NAME); - let manifest_wal = WalBuilder::with_default_rocksdb_config(manifest_path, write_runtime) - .build() - .context(OpenManifestWal)?; - - let manifest = ManifestImpl::open(manifest_wal, config.manifest.clone()) - .await - .context(OpenManifest)?; - - let instance = - open_with_wal_manifest(config, engine_runtimes, wal_manager, manifest, store).await?; - - Ok(instance) -} - -async fn open_replicated_instance( - config: Config, - engine_runtimes: Arc, - store: ObjectStoreRef, -) -> Result { - assert!(config.obkv_wal.enable); - - // Notice the creation of obkv client may block current thread. - let obkv_config = config.obkv_wal.obkv.clone(); - let obkv = engine_runtimes - .write_runtime - .spawn_blocking(move || ObkvImpl::new(obkv_config).context(OpenObkv)) - .await - .context(RuntimeExec)??; - - open_instance_with_table_kv(config, engine_runtimes, obkv, store).await -} - -async fn open_instance_with_table_kv( +async fn open_wal_and_manifest_with_table_kv( config: Config, engine_runtimes: Arc, table_kv: T, - store: ObjectStoreRef, -) -> Result> { +) -> Result<(WalManagerRef, ManifestRef)> { let runtimes = WalRuntimes { read_runtime: engine_runtimes.read_runtime.clone(), write_runtime: engine_runtimes.write_runtime.clone(), @@ -214,32 +195,26 @@ async fn open_instance_with_table_kv( let manifest_wal = WalNamespaceImpl::open( table_kv, - runtimes.clone(), + runtimes, MANIFEST_DIR_NAME, config.obkv_wal.manifest.clone(), ) .await .context(OpenManifestWal)?; - let manifest = ManifestImpl::open(manifest_wal, config.manifest.clone()) + let manifest = ManifestImpl::open(Arc::new(manifest_wal), config.manifest.clone()) .await .context(OpenManifest)?; - let instance = - open_with_wal_manifest(config, engine_runtimes, wal_manager, manifest, store).await?; - Ok(instance) + Ok((Arc::new(wal_manager), Arc::new(manifest))) } -async fn open_with_wal_manifest( +async fn open_instance( config: Config, engine_runtimes: Arc, - wal_manager: Wal, - manifest: Meta, + wal_manager: WalManagerRef, + manifest: ManifestRef, store: ObjectStoreRef, -) -> Result> -where - Wal: WalManager + Send + Sync + 'static, - Meta: Manifest + Send + Sync + 'static, -{ +) -> Result { let meta_cache: Option = if let Some(sst_meta_cache_cap) = &config.sst_meta_cache_cap { Some(Arc::new(LruMetaCache::new(*sst_meta_cache_cap))) diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index fdc06073c5..a815467aaa 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -19,11 +19,9 @@ use table_engine::{ }, }; use tokio::sync::oneshot; -use wal::manager::WalManager; use crate::{ instance::{flush_compaction::TableFlushOptions, InstanceRef}, - meta::Manifest, space::SpaceAndTable, }; @@ -36,21 +34,17 @@ pub mod version_edit; // TODO(yingwen): How to handle drop table? /// Table trait implementation -pub struct TableImpl { +pub struct TableImpl { /// Space and table info space_table: SpaceAndTable, /// Instance - instance: InstanceRef, + instance: InstanceRef, /// Engine type engine_type: String, } -impl TableImpl { - pub fn new( - space_table: SpaceAndTable, - instance: InstanceRef, - engine_type: String, - ) -> Self { +impl TableImpl { + pub fn new(space_table: SpaceAndTable, instance: InstanceRef, engine_type: String) -> Self { Self { space_table, instance, @@ -59,7 +53,7 @@ impl TableImpl { } } -impl fmt::Debug for TableImpl { +impl fmt::Debug for TableImpl { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TableImpl") .field("space_table", &self.space_table) @@ -68,9 +62,7 @@ impl fmt::Debug for TableImpl { } #[async_trait] -impl Table - for TableImpl -{ +impl Table for TableImpl { fn name(&self) -> &str { &self.space_table.table_data().name } diff --git a/components/object_store/src/lib.rs b/components/object_store/src/lib.rs index 036126d9ea..7a1aafbc46 100644 --- a/components/object_store/src/lib.rs +++ b/components/object_store/src/lib.rs @@ -12,4 +12,4 @@ pub use upstream::{ pub mod aliyun; pub mod cache; -pub type ObjectStoreRef = Arc; +pub type ObjectStoreRef = Arc; diff --git a/wal/src/manager.rs b/wal/src/manager.rs index 8943a03888..4b77a5e00e 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -233,7 +233,7 @@ pub trait BatchLogIterator { /// Every region has its own increasing (and maybe hallow) sequence number /// space. #[async_trait] -pub trait WalManager: fmt::Debug { +pub trait WalManager: Send + Sync + fmt::Debug { /// Get current sequence number. async fn sequence_num(&self, region_id: RegionId) -> Result; @@ -327,3 +327,5 @@ impl BatchLogIterator for BatchLogIteratorAdapter { Ok(log_entries) } } + +pub type WalManagerRef = Arc;