Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove Meta and Wal type params in Instance #163

Merged
merged 3 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
PickerManager, TableCompactionRequest, WaitError, WaiterNotifier,
},
instance::SpaceStore,
meta::Manifest,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -219,8 +218,8 @@ pub struct SchedulerImpl {
}

impl SchedulerImpl {
pub fn new<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>(
space_store: Arc<SpaceStore<Wal, Meta>>,
pub fn new(
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
) -> Self {
Expand Down Expand Up @@ -293,10 +292,10 @@ impl OngoingTask {
}
}

struct ScheduleWorker<Wal, Meta> {
struct ScheduleWorker {
sender: Sender<ScheduleTask>,
receiver: Receiver<ScheduleTask>,
space_store: Arc<SpaceStore<Wal, Meta>>,
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
schedule_interval: Duration,
picker_manager: PickerManager,
Expand All @@ -314,7 +313,7 @@ async fn schedule_table_compaction(sender: Sender<ScheduleTask>, request: TableC
}
}

impl<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> ScheduleWorker<Wal, Meta> {
impl ScheduleWorker {
async fn schedule_loop(&mut self) {
while self.running.load(Ordering::Relaxed) {
// TODO(yingwen): Maybe add a random offset to the interval.
Expand Down
35 changes: 9 additions & 26 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,37 @@ use table_engine::{
table::{SchemaId, TableRef},
ANALYTIC_ENGINE_TYPE,
};
use wal::{manager::WalManager, rocks_impl::manager::RocksImpl};

use crate::{
context::CommonContext,
instance::InstanceRef,
meta::{details::ManifestImpl, Manifest},
space::SpaceId,
table::TableImpl,
ObkvWal,
};

use crate::{context::CommonContext, instance::InstanceRef, space::SpaceId, table::TableImpl};

/// TableEngine implementation
pub struct TableEngineImpl<Wal, Meta> {
pub struct TableEngineImpl {
/// Instance of the table engine
instance: InstanceRef<Wal, Meta>,
instance: InstanceRef,
}

impl<Wal, Meta> Clone for TableEngineImpl<Wal, Meta> {
impl Clone for TableEngineImpl {
fn clone(&self) -> Self {
Self {
instance: self.instance.clone(),
}
}
}

impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>
TableEngineImpl<Wal, Meta>
{
pub fn new(instance: InstanceRef<Wal, Meta>) -> Self {
impl TableEngineImpl {
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}
}

impl<Wal, Meta> Drop for TableEngineImpl<Wal, Meta> {
impl Drop for TableEngineImpl {
fn drop(&mut self) {
info!("Table engine dropped");
}
}

#[async_trait]
impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> TableEngine
for TableEngineImpl<Wal, Meta>
{
impl TableEngine for TableEngineImpl {
fn engine_type(&self) -> &str {
ANALYTIC_ENGINE_TYPE
}
Expand Down Expand Up @@ -159,11 +147,6 @@ impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'st
}
}

/// Reference to instance based on rocksdb wal.
pub(crate) type RocksInstanceRef = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>>;
/// Reference to instance replicating data by obkv wal.
pub(crate) type ReplicatedInstanceRef = InstanceRef<ObkvWal, ManifestImpl<ObkvWal>>;

/// Generate the space id from the schema id with assumption schema id is unique
/// globally.
#[inline]
Expand Down
14 changes: 2 additions & 12 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::info;
use snafu::{ensure, ResultExt};
use table_engine::table::AlterSchemaRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -21,20 +20,13 @@ use crate::{
write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
space::SpaceAndTable,
table::data::TableDataRef,
table_options,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
// Alter schema need to be handled by write worker.
pub async fn alter_schema_of_table(
&self,
Expand Down Expand Up @@ -109,7 +101,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space_table.space().id,
table: &table_data.name,
Expand Down Expand Up @@ -242,7 +233,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space_table.space().id,
table: &table_data.name,
Expand Down
8 changes: 1 addition & 7 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::{info, warn};
use snafu::ResultExt;
use table_engine::engine::CloseTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -17,15 +16,10 @@ use crate::{
write_worker::{self, CloseTableCommand, WorkerLocal},
Instance,
},
meta::Manifest,
space::SpaceRef,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Close table need to be handled by write worker.
pub async fn do_close_table(&self, space: SpaceRef, request: CloseTableRequest) -> Result<()> {
info!("Instance close table, request:{:?}", request);
Expand Down
13 changes: 2 additions & 11 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,20 @@ use log::info;
use snafu::ResultExt;
use table_engine::engine::CreateTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
engine::{CreateTableData, InvalidOptions, OperateByWriteWorker, Result, WriteManifest},
write_worker::{self, CreateTableCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{AddTableMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{AddTableMeta, MetaUpdate},
space::SpaceRef,
table::data::{TableData, TableDataRef},
table_options,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Create table need to be handled by write worker.
pub async fn do_create_table(
&self,
Expand Down Expand Up @@ -114,7 +106,6 @@ where
.manifest
.store_update(update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space.id,
table: &table_data.name,
Expand Down
13 changes: 2 additions & 11 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use log::{info, warn};
use snafu::ResultExt;
use table_engine::engine::DropTableRequest;
use tokio::sync::oneshot;
use wal::manager::WalManager;

use crate::{
instance::{
Expand All @@ -17,18 +16,11 @@ use crate::{
write_worker::{self, DropTableCommand, WorkerLocal},
Instance,
},
meta::{
meta_update::{DropTableMeta, MetaUpdate},
Manifest,
},
meta::meta_update::{DropTableMeta, MetaUpdate},
space::SpaceRef,
};

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Drop a table under given space
pub async fn do_drop_table(
self: &Arc<Self>,
Expand Down Expand Up @@ -109,7 +101,6 @@ where
.manifest
.store_update(update)
.await
.map_err(|e| Box::new(e) as _)
.context(WriteManifest {
space_id: space.id,
table: &table_data.name,
Expand Down
8 changes: 1 addition & 7 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::WalManager;

use crate::{
context::CommonContext,
instance::{write_worker::WriteGroup, Instance},
meta::Manifest,
space::{Space, SpaceAndTable, SpaceId, SpaceRef},
};

Expand Down Expand Up @@ -205,11 +203,7 @@ impl From<Error> for table_engine::engine::Error {
}
}

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Find space by name, create if the space is not exists
pub async fn find_or_create_space(
self: &Arc<Self>,
Expand Down
18 changes: 4 additions & 14 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::{RegionId, WalManager};
use wal::manager::RegionId;

use crate::{
compaction::{
Expand All @@ -33,10 +33,7 @@ use crate::{
Instance, SpaceStore,
},
memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest},
meta::{
meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
Manifest,
},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
row_iter::{
self,
dedup::DedupIterator,
Expand Down Expand Up @@ -181,11 +178,7 @@ pub enum TableFlushPolicy {
Purge,
}

impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
{
impl Instance {
/// Flush this table.
pub async fn flush_table(
&self,
Expand Down Expand Up @@ -291,7 +284,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

table_data.set_table_options(worker_local, new_table_opts);
Expand Down Expand Up @@ -530,7 +522,6 @@ where
.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

// Edit table version to remove dumped memtables.
Expand Down Expand Up @@ -765,7 +756,7 @@ where
}
}

impl<Wal, Meta: Manifest> SpaceStore<Wal, Meta> {
impl SpaceStore {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
Expand Down Expand Up @@ -806,7 +797,6 @@ impl<Wal, Meta: Manifest> SpaceStore<Wal, Meta> {
self.manifest
.store_update(meta_update)
.await
.map_err(|e| Box::new(e) as _)
.context(StoreVersionEdit)?;

// Apply to the table version.
Expand Down
Loading