Skip to content

Commit

Permalink
fix: use right table manipulator when setup (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Oct 21, 2022
1 parent f56046e commit 0a28ba6
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ server = { workspace = true }
signal-hook = "0.3"
table_engine = { workspace = true }
tracing_util = { workspace = true }
interpreters = { workspace = true }

[build-dependencies]
vergen = { version = "5", default-features = false, features = ["build", "git"] }
Expand Down
6 changes: 6 additions & 0 deletions interpreters/src/table_manipulator/meta_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub struct TableManipulatorImpl {
meta_client: MetaClientRef,
}

impl TableManipulatorImpl {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}

#[async_trait]
impl TableManipulator for TableManipulatorImpl {
async fn create_table(
Expand Down
16 changes: 12 additions & 4 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use catalog::manager::ManagerRef;
use cluster::ClusterRef;
use df_operator::registry::FunctionRegistryRef;
use interpreters::table_manipulator::catalog_based;
use interpreters::table_manipulator::TableManipulatorRef;
use log::warn;
use query_engine::executor::Executor as QueryExecutor;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
Expand Down Expand Up @@ -45,6 +45,9 @@ pub enum Error {
#[snafu(display("Missing table engine.\nBacktrace:\n{}", backtrace))]
MissingTableEngine { backtrace: Backtrace },

#[snafu(display("Missing table manipulator.\nBacktrace:\n{}", backtrace))]
MissingTableManipulator { backtrace: Backtrace },

#[snafu(display("Missing function registry.\nBacktrace:\n{}", backtrace))]
MissingFunctionRegistry { backtrace: Backtrace },

Expand Down Expand Up @@ -141,6 +144,7 @@ pub struct Builder<Q> {
catalog_manager: Option<ManagerRef>,
query_executor: Option<Q>,
table_engine: Option<TableEngineRef>,
table_manipulator: Option<TableManipulatorRef>,
function_registry: Option<FunctionRegistryRef>,
limiter: Limiter,
cluster: Option<ClusterRef>,
Expand All @@ -156,6 +160,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
catalog_manager: None,
query_executor: None,
table_engine: None,
table_manipulator: None,
function_registry: None,
limiter: Limiter::default(),
cluster: None,
Expand Down Expand Up @@ -184,6 +189,11 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
self
}

pub fn table_manipulator(mut self, val: TableManipulatorRef) -> Self {
self.table_manipulator = Some(val);
self
}

pub fn function_registry(mut self, val: FunctionRegistryRef) -> Self {
self.function_registry = Some(val);
self
Expand Down Expand Up @@ -218,12 +228,10 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let catalog_manager = self.catalog_manager.context(MissingCatalogManager)?;
let query_executor = self.query_executor.context(MissingQueryExecutor)?;
let table_engine = self.table_engine.context(MissingTableEngine)?;
let table_manipulator = self.table_manipulator.context(MissingTableManipulator)?;
let function_registry = self.function_registry.context(MissingFunctionRegistry)?;

let instance = {
let table_manipulator = Arc::new(catalog_based::TableManipulatorImpl::new(
catalog_manager.clone(),
));
let instance = Instance {
catalog_manager,
query_executor,
Expand Down
13 changes: 11 additions & 2 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl
use cluster::{cluster_impl::ClusterImpl, shard_tables_cache::ShardTablesCache};
use common_util::runtime;
use df_operator::registry::FunctionRegistryImpl;
use interpreters::table_manipulator::{catalog_based, meta_based};
use log::info;
use logger::RuntimeLevel;
use meta_client::meta_impl;
Expand Down Expand Up @@ -165,12 +166,16 @@ async fn build_in_cluster_mode<Q: Executor + 'static>(
Arc::new(cluster_impl)
};

let catalog_manager = Arc::new(volatile::ManagerImpl::new(shard_tables_cache, meta_client));

let catalog_manager = Arc::new(volatile::ManagerImpl::new(
shard_tables_cache,
meta_client.clone(),
));
let table_manipulator = Arc::new(meta_based::TableManipulatorImpl::new(meta_client));
let router = Arc::new(ClusterBasedRouter::new(cluster.clone()));
let schema_config_provider = Arc::new(ClusterBasedProvider::new(cluster.clone()));
builder
.catalog_manager(catalog_manager)
.table_manipulator(table_manipulator)
.cluster(cluster)
.router(router)
.schema_config_provider(schema_config_provider)
Expand All @@ -188,6 +193,9 @@ async fn build_in_standalone_mode<Q: Executor + 'static>(

// Create catalog manager, use analytic table as backend
let catalog_manager = Arc::new(CatalogManagerImpl::new(Arc::new(table_based_manager)));
let table_manipulator = Arc::new(catalog_based::TableManipulatorImpl::new(
catalog_manager.clone(),
));

// Create schema in default catalog.
create_static_topology_schema(
Expand All @@ -207,6 +215,7 @@ async fn build_in_standalone_mode<Q: Executor + 'static>(

builder
.catalog_manager(catalog_manager)
.table_manipulator(table_manipulator)
.router(router)
.schema_config_provider(schema_config_provider)
}
Expand Down

0 comments on commit 0a28ba6

Please sign in to comment.