Skip to content

Commit

Permalink
refactor: remove Meta and Wal type params in Instance (#163)
Browse files Browse the repository at this point in the history
* refactor: remove Meta and Wal type params in Instance

* refactor: replace ManifestImpl with ManifestRef when build engine

* Fix assert obkv in EngineBuilder
  • Loading branch information
ygf11 authored Aug 5, 2022
1 parent 0847180 commit 6c61002
Show file tree
Hide file tree
Showing 20 changed files with 157 additions and 274 deletions.
11 changes: 5 additions & 6 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::SpaceStore,
meta::Manifest,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -219,8 +218,8 @@ pub struct SchedulerImpl {
}

impl SchedulerImpl {
pub fn new<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>(
space_store: Arc<SpaceStore<Wal, Meta>>,
pub fn new(
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
) -> Self {
Expand Down Expand Up @@ -293,10 +292,10 @@ impl OngoingTask {
}
}

struct ScheduleWorker<Wal, Meta> {
struct ScheduleWorker {
sender: Sender<ScheduleTask>,
receiver: Receiver<ScheduleTask>,
space_store: Arc<SpaceStore<Wal, Meta>>,
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
schedule_interval: Duration,
picker_manager: PickerManager,
Expand All @@ -314,7 +313,7 @@ async fn schedule_table_compaction(sender: Sender<ScheduleTask>, request: TableC
}
}

impl<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> ScheduleWorker<Wal, Meta> {
impl ScheduleWorker {
async fn schedule_loop(&mut self) {
while self.running.load(Ordering::Relaxed) {
// TODO(yingwen): Maybe add a random offset to the interval.
Expand Down
35 changes: 9 additions & 26 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,37 @@ use table_engine::{
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
};
use wal::{manager::WalManager, rocks_impl::manager::RocksImpl};

use crate::{
context::CommonContext,
instance::InstanceRef,
meta::{details::ManifestImpl, Manifest},
space::SpaceId,
table::TableImpl,
ObkvWal,
};

use crate::{context::CommonContext, instance::InstanceRef, space::SpaceId, table::TableImpl};

/// TableEngine implementation
pub struct TableEngineImpl<Wal, Meta> {
pub struct TableEngineImpl {
/// Instance of the table engine
instance: InstanceRef<Wal, Meta>,
instance: InstanceRef,
}

impl<Wal, Meta> Clone for TableEngineImpl<Wal, Meta> {
impl Clone for TableEngineImpl {
fn clone(&self) -> Self {
Self {
instance: self.instance.clone(),
}
}
}

impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>
TableEngineImpl<Wal, Meta>
{
pub fn new(instance: InstanceRef<Wal, Meta>) -> Self {
impl TableEngineImpl {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}
}

impl<Wal, Meta> Drop for TableEngineImpl<Wal, Meta> {
impl Drop for TableEngineImpl {
fn drop(&mut self) {
info!("Table engine dropped");
}
}

#[async_trait]
impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> TableEngine
for TableEngineImpl<Wal, Meta>
{
impl TableEngine for TableEngineImpl {
fn engine_type(&self) -> &str {
ANALYTIC_ENGINE_TYPE
}
Expand Down Expand Up @@ -159,11 +147,6 @@ impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'st
}
}

/// Reference to instance based on rocksdb wal.
pub(crate) type RocksInstanceRef = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>>;
/// Reference to instance replicating data by obkv wal.
pub(crate) type ReplicatedInstanceRef = InstanceRef<ObkvWal, ManifestImpl<ObkvWal>>;

/// Generate the space id from the schema id with assumption schema id is unique
/// globally.
#[inline]
Expand Down
14 changes: 2 additions & 12 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -21,20 +20,13 @@ use crate::{
write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
space::SpaceAndTable,
table::data::TableDataRef,
table_options,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
// Alter schema need to be handled by write worker.
pub async fn alter_schema_of_table(
&self,
Expand Down Expand Up @@ -109,7 +101,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space_table.space().id,
table: &table_data.name,
Expand Down Expand Up @@ -242,7 +233,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space_table.space().id,
table: &table_data.name,
Expand Down
8 changes: 1 addition & 7 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::{info, warn};
use snafu::ResultExt;
use table_engine::engine::CloseTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -17,15 +16,10 @@ use crate::{
write_worker::{self, CloseTableCommand, WorkerLocal},
Instance,
},
meta::Manifest,
space::SpaceRef,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Close table need to be handled by write worker.
pub async fn do_close_table(&self, space: SpaceRef, request: CloseTableRequest) -> Result<()> {
info!("Instance close table, request:{:?}", request);
Expand Down
13 changes: 2 additions & 11 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,20 @@ use log::info;
use snafu::ResultExt;
use table_engine::engine::CreateTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
engine::{CreateTableData, InvalidOptions, OperateByWriteWorker, Result, WriteManifest},
write_worker::{self, CreateTableCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{AddTableMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{AddTableMeta, MetaUpdate},
space::SpaceRef,
table::data::{TableData, TableDataRef},
table_options,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Create table need to be handled by write worker.
pub async fn do_create_table(
&self,
Expand Down Expand Up @@ -114,7 +106,6 @@ where
.manifest
.store_update(update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space.id,
table: &table_data.name,
Expand Down
13 changes: 2 additions & 11 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::{info, warn};
use snafu::ResultExt;
use table_engine::engine::DropTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -17,18 +16,11 @@ use crate::{
write_worker::{self, DropTableCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{DropTableMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{DropTableMeta, MetaUpdate},
space::SpaceRef,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Drop a table under given space
pub async fn do_drop_table(
self: &Arc<Self>,
Expand Down Expand Up @@ -109,7 +101,6 @@ where
.manifest
.store_update(update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space.id,
table: &table_data.name,
Expand Down
8 changes: 1 addition & 7 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::WalManager;

use crate::{
context::CommonContext,
instance::{write_worker::WriteGroup, Instance},
meta::Manifest,
space::{Space, SpaceAndTable, SpaceId, SpaceRef},
};

Expand Down Expand Up @@ -205,11 +203,7 @@ impl From<Error> for table_engine::engine::Error {
}
}

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Find space by name, create if the space is not exists
pub async fn find_or_create_space(
self: &Arc<Self>,
Expand Down
18 changes: 4 additions & 14 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::{RegionId, WalManager};
use wal::manager::RegionId;

use crate::{
compaction::{
Expand All @@ -33,10 +33,7 @@ use crate::{
Instance, SpaceStore,
},
memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest},
meta::{
meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
Manifest,
},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
row_iter::{
self,
dedup::DedupIterator,
Expand Down Expand Up @@ -181,11 +178,7 @@ pub enum TableFlushPolicy {
Purge,
}

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Flush this table.
pub async fn flush_table(
&self,
Expand Down Expand Up @@ -291,7 +284,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

table_data.set_table_options(worker_local, new_table_opts);
Expand Down Expand Up @@ -530,7 +522,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

// Edit table version to remove dumped memtables.
Expand Down Expand Up @@ -765,7 +756,7 @@ where
}
}

impl<Wal, Meta: Manifest> SpaceStore<Wal, Meta> {
impl SpaceStore {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
Expand Down Expand Up @@ -806,7 +797,6 @@ impl<Wal, Meta: Manifest> SpaceStore<Wal, Meta> {
self.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

// Apply to the table version.
Expand Down
Loading

0 comments on commit 6c61002

Please sign in to comment.