Skip to content

Commit

Permalink
refactor: remove Fa type param in Instance (apache#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
ygf11 authored Jul 18, 2022
1 parent 58f821c commit ca706a7
Show file tree
Hide file tree
Showing 22 changed files with 115 additions and 143 deletions.
20 changes: 5 additions & 15 deletions analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::{
},
instance::SpaceStore,
meta::Manifest,
sst::factory::Factory,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -220,12 +219,8 @@ pub struct SchedulerImpl {
}

impl SchedulerImpl {
pub fn new<
Wal: Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
>(
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
pub fn new<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>(
space_store: Arc<SpaceStore<Wal, Meta>>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
) -> Self {
Expand Down Expand Up @@ -298,10 +293,10 @@ impl OngoingTask {
}
}

struct ScheduleWorker<Wal, Meta, Fa> {
struct ScheduleWorker<Wal, Meta> {
sender: Sender<ScheduleTask>,
receiver: Receiver<ScheduleTask>,
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
space_store: Arc<SpaceStore<Wal, Meta>>,
runtime: Arc<Runtime>,
schedule_interval: Duration,
picker_manager: PickerManager,
Expand All @@ -319,12 +314,7 @@ async fn schedule_table_compaction(sender: Sender<ScheduleTask>, request: TableC
}
}

impl<
Wal: Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
> ScheduleWorker<Wal, Meta, Fa>
{
impl<Wal: Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> ScheduleWorker<Wal, Meta> {
async fn schedule_loop(&mut self) {
while self.running.load(Ordering::Relaxed) {
// TODO(yingwen): Maybe add a random offset to the interval.
Expand Down
26 changes: 11 additions & 15 deletions analytic_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,41 @@ use crate::{
instance::InstanceRef,
meta::{details::ManifestImpl, Manifest},
space::SpaceId,
sst::factory::{Factory, FactoryImpl},
table::TableImpl,
ObkvWal,
};

/// TableEngine implementation
pub struct TableEngineImpl<Wal, Meta, Fa> {
pub struct TableEngineImpl<Wal, Meta> {
/// Instance of the table engine
instance: InstanceRef<Wal, Meta, Fa>,
instance: InstanceRef<Wal, Meta>,
}

impl<Wal, Meta, Fa> Clone for TableEngineImpl<Wal, Meta, Fa> {
impl<Wal, Meta> Clone for TableEngineImpl<Wal, Meta> {
fn clone(&self) -> Self {
Self {
instance: self.instance.clone(),
}
}
}

impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static, Fa>
TableEngineImpl<Wal, Meta, Fa>
impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static>
TableEngineImpl<Wal, Meta>
{
pub fn new(instance: InstanceRef<Wal, Meta, Fa>) -> Self {
pub fn new(instance: InstanceRef<Wal, Meta>) -> Self {
Self { instance }
}
}

impl<Wal, Meta, Fa> Drop for TableEngineImpl<Wal, Meta, Fa> {
impl<Wal, Meta> Drop for TableEngineImpl<Wal, Meta> {
fn drop(&mut self) {
info!("Table engine dropped");
}
}

#[async_trait]
impl<
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
> TableEngine for TableEngineImpl<Wal, Meta, Fa>
impl<Wal: WalManager + Send + Sync + 'static, Meta: Manifest + Send + Sync + 'static> TableEngine
for TableEngineImpl<Wal, Meta>
{
fn engine_type(&self) -> &str {
ANALYTIC_ENGINE_TYPE
Expand Down Expand Up @@ -164,9 +160,9 @@ impl<
}

/// Reference to instance based on rocksdb wal.
pub(crate) type RocksInstanceRef = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>, FactoryImpl>;
pub(crate) type RocksInstanceRef = InstanceRef<RocksImpl, ManifestImpl<RocksImpl>>;
/// Reference to instance replicating data by obkv wal.
pub(crate) type ReplicatedInstanceRef = InstanceRef<ObkvWal, ManifestImpl<ObkvWal>, FactoryImpl>;
pub(crate) type ReplicatedInstanceRef = InstanceRef<ObkvWal, ManifestImpl<ObkvWal>>;

/// Generate the space id from the schema id with assumption schema id is unique
/// globally.
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@ use crate::{
Manifest,
},
space::SpaceAndTable,
sst::factory::Factory,
table::data::TableDataRef,
table_options,
};

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
// Alter schema need to be handled by write worker.
pub async fn alter_schema_of_table(
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ use crate::{
},
meta::Manifest,
space::SpaceRef,
sst::factory::Factory,
};

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Close table need to be handled by write worker.
pub async fn do_close_table(&self, space: SpaceRef, request: CloseTableRequest) -> Result<()> {
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ use crate::{
Manifest,
},
space::SpaceRef,
sst::factory::Factory,
table::data::{TableData, TableDataRef},
table_options,
};

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Create table need to be handled by write worker.
pub async fn do_create_table(
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ use crate::{
Manifest,
},
space::SpaceRef,
sst::factory::Factory,
};

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Drop a table under given space
pub async fn do_drop_table(
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{
instance::{write_worker::WriteGroup, Instance},
meta::Manifest,
space::{Space, SpaceAndTable, SpaceId, SpaceRef},
sst::factory::Factory,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -206,11 +205,10 @@ impl From<Error> for table_engine::engine::Error {
}
}

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Find space by name, create if the space is not exists
pub async fn find_or_create_space(
Expand Down
9 changes: 4 additions & 5 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
space::SpaceAndTable,
sst::{
builder::RecordBatchStream,
factory::{Factory, SstBuilderOptions, SstReaderOptions, SstType},
factory::{SstBuilderOptions, SstReaderOptions, SstType},
file::{self, FileMeta, SstMetaData},
},
table::{
Expand Down Expand Up @@ -181,11 +181,10 @@ pub enum TableFlushPolicy {
Purge,
}

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Flush this table.
pub async fn flush_table(
Expand Down Expand Up @@ -766,7 +765,7 @@ where
}
}

impl<Wal, Meta: Manifest, Fa: Factory> SpaceStore<Wal, Meta, Fa> {
impl<Wal, Meta: Manifest> SpaceStore<Wal, Meta> {
pub(crate) async fn compact_table(
&self,
runtime: Arc<Runtime>,
Expand Down Expand Up @@ -883,7 +882,7 @@ impl<Wal, Meta: Manifest, Fa: Factory> SpaceStore<Wal, Meta, Fa> {
sequence,
projected_schema,
predicate: Arc::new(Predicate::empty()),
sst_factory: self.sst_factory.clone(),
sst_factory: &self.sst_factory,
sst_reader_options,
store: self.store_ref(),
merge_iter_options: iter_options.clone(),
Expand Down
22 changes: 11 additions & 11 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
compaction::scheduler::CompactionSchedulerRef,
meta::Manifest,
space::{SpaceId, SpaceRef},
sst::file::FilePurger,
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Spaces {
}
}

pub struct SpaceStore<Wal, Meta, Fa> {
pub struct SpaceStore<Wal, Meta> {
/// All spaces of the engine.
spaces: RwLock<Spaces>,
/// Manifest (or meta) stores meta data of the engine instance.
Expand All @@ -95,19 +95,19 @@ pub struct SpaceStore<Wal, Meta, Fa> {
/// Sst storage.
store: ObjectStoreRef,
/// Sst factory.
sst_factory: Fa,
sst_factory: SstFactoryRef,

meta_cache: Option<MetaCacheRef>,
data_cache: Option<DataCacheRef>,
}

impl<Wal, Meta, Fa> Drop for SpaceStore<Wal, Meta, Fa> {
impl<Wal, Meta> Drop for SpaceStore<Wal, Meta> {
fn drop(&mut self) {
info!("SpaceStore dropped");
}
}

impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
impl<Wal, Meta> SpaceStore<Wal, Meta> {
async fn close(&self) -> Result<()> {
let spaces = self.spaces.read().unwrap().list_all_spaces();
for space in spaces {
Expand All @@ -119,7 +119,7 @@ impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
}
}

impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
impl<Wal, Meta> SpaceStore<Wal, Meta> {
fn store_ref(&self) -> &ObjectStoreRef {
&self.store
}
Expand All @@ -142,9 +142,9 @@ impl<Wal, Meta, Fa> SpaceStore<Wal, Meta, Fa> {
///
/// Manages all spaces, also contains needed resources shared across all table
// TODO(yingwen): Track memory usage of all tables (or tables of space)
pub struct Instance<Wal, Meta, Fa> {
pub struct Instance<Wal, Meta> {
/// Space storage
space_store: Arc<SpaceStore<Wal, Meta, Fa>>,
space_store: Arc<SpaceStore<Wal, Meta>>,
/// Runtime to execute async tasks.
runtimes: Arc<EngineRuntimes>,
/// Global table options, overwrite mutable options in each table's
Expand All @@ -170,7 +170,7 @@ pub struct Instance<Wal, Meta, Fa> {
pub(crate) replay_batch_size: usize,
}

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa> {
impl<Wal, Meta> Instance<Wal, Meta> {
/// Close the instance gracefully.
pub async fn close(&self) -> Result<()> {
self.file_purger.stop().await.context(StopFilePurger)?;
Expand All @@ -185,7 +185,7 @@ impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa> {
}

// TODO(yingwen): Instance builder
impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa> Instance<Wal, Meta, Fa> {
impl<Wal: WalManager + Send + Sync, Meta: Manifest> Instance<Wal, Meta> {
/// Find space using read lock
fn get_space_by_read_lock(&self, space: SpaceId) -> Option<SpaceRef> {
let spaces = self.space_store.spaces.read().unwrap();
Expand Down Expand Up @@ -222,4 +222,4 @@ impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa> Instance<Wal, Meta, Fa>
}

/// Instance reference
pub type InstanceRef<Wal, Meta, Fa> = Arc<Instance<Wal, Meta, Fa>>;
pub type InstanceRef<Wal, Meta> = Arc<Instance<Wal, Meta>>;
7 changes: 3 additions & 4 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,22 @@ use crate::{
meta::{meta_data::TableManifestData, Manifest},
payload::{ReadPayload, WalDecoder},
space::{Space, SpaceId, SpaceRef},
sst::{factory::Factory, file::FilePurger},
sst::{factory::FactoryRef as SstFactoryRef, file::FilePurger},
table::data::{TableData, TableDataRef},
};

impl<Wal, Meta, Fa> Instance<Wal, Meta, Fa>
impl<Wal, Meta> Instance<Wal, Meta>
where
Wal: WalManager + Send + Sync + 'static,
Meta: Manifest + Send + Sync + 'static,
Fa: Factory + Send + Sync + 'static,
{
/// Open a new instance
pub async fn open(
ctx: OpenContext,
manifest: Meta,
wal_manager: Wal,
store: ObjectStoreRef,
sst_factory: Fa,
sst_factory: SstFactoryRef,
) -> Result<Arc<Self>> {
let space_store = Arc::new(SpaceStore {
spaces: RwLock::new(Spaces::default()),
Expand Down
8 changes: 4 additions & 4 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
IterOptions, RecordBatchWithKeyIterator,
},
space::SpaceAndTable,
sst::factory::{Factory, SstReaderOptions},
sst::factory::SstReaderOptions,
table::{
data::TableData,
version::{ReadView, TableVersion},
Expand Down Expand Up @@ -75,7 +75,7 @@ fn need_merge_sort_streams(table_options: &TableOptions, read_request: &ReadRequ
table_options.need_dedup() || read_request.order.is_in_order()
}

impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa: Factory> Instance<Wal, Meta, Fa> {
impl<Wal: WalManager + Send + Sync, Meta: Manifest> Instance<Wal, Meta> {
/// Read data in multiple time range from table, and return
/// `read_parallelism` output streams.
pub async fn partitioned_read_from_table(
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa: Factory> Instance<Wal, M
sequence,
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
sst_factory: self.space_store.sst_factory.clone(),
sst_factory: &self.space_store.sst_factory,
sst_reader_options: sst_reader_options.clone(),
store: self.space_store.store_ref(),
merge_iter_options: iter_options.clone(),
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<Wal: WalManager + Send + Sync, Meta: Manifest, Fa: Factory> Instance<Wal, M
projected_schema: projected_schema.clone(),
predicate: request.predicate.clone(),
sst_reader_options: sst_reader_options.clone(),
sst_factory: self.space_store.sst_factory.clone(),
sst_factory: &self.space_store.sst_factory,
store: self.space_store.store_ref(),
};
let builder = chain::Builder::new(chain_config);
Expand Down
Loading

0 comments on commit ca706a7

Please sign in to comment.