From 7103d3e7936a7002455bfe18f27c8c2d7352a376 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 22 Mar 2023 18:01:46 +0800 Subject: [PATCH 1/2] move table engine proxy to table_engine crate. --- catalog_impls/src/table_based.rs | 3 +- server/src/handlers/influxdb.rs | 2 + server/src/lib.rs | 1 - src/setup.rs | 3 +- table_engine/src/lib.rs | 1 + table_engine/src/memory.rs | 47 +++++++++++++- .../src/proxy.rs | 62 +++++-------------- 7 files changed, 65 insertions(+), 54 deletions(-) rename server/src/table_engine.rs => table_engine/src/proxy.rs (59%) diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 03cc1dd7bc..9c42ca52b3 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -912,9 +912,10 @@ mod tests { schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef}, }; use common_types::table::{DEFAULT_CLUSTER_VERSION, DEFAULT_SHARD_ID}; - use server::table_engine::{MemoryTableEngine, TableEngineProxy}; use table_engine::{ engine::{TableEngineRef, TableState}, + memory::MemoryTableEngine, + proxy::TableEngineProxy, ANALYTIC_ENGINE_TYPE, }; diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 8a057a2e5c..4035dbda1c 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -226,6 +226,8 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } +// fn convert_query_result(output: Output) + // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext, diff --git a/server/src/lib.rs b/server/src/lib.rs index e684b8e947..30a5d619cd 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -22,4 +22,3 @@ mod metrics; mod mysql; pub mod schema_config_provider; pub mod server; -pub mod table_engine; diff --git a/src/setup.rs b/src/setup.rs index 682bc16a42..c45f048683 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -30,9 +30,8 @@ use server::{ cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, }, server::Builder, - table_engine::{MemoryTableEngine, TableEngineProxy}, }; -use table_engine::engine::EngineRuntimes; +use table_engine::{engine::EngineRuntimes, memory::MemoryTableEngine, proxy::TableEngineProxy}; use tracing_util::{ self, tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation}, diff --git a/table_engine/src/lib.rs b/table_engine/src/lib.rs index 3ec6e0c384..9874c6c239 100644 --- a/table_engine/src/lib.rs +++ b/table_engine/src/lib.rs @@ -10,6 +10,7 @@ pub mod memory; pub mod partition; pub mod predicate; pub mod provider; +pub mod proxy; pub mod remote; pub mod stream; pub mod table; diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 61f2e6df78..ce06840b3e 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -1,6 +1,6 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -//! In-memory table implementations +//! In-memory table engine implementations use std::{ collections::HashMap, @@ -23,14 +23,18 @@ use futures::stream::Stream; use snafu::{OptionExt, ResultExt}; use crate::{ + engine::{ + CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine, + }, stream::{ self, ErrNoSource, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream, }, table::{ AlterSchemaRequest, FlushRequest, GetRequest, ReadRequest, Result, Table, TableId, - TableStats, UnsupportedMethod, WriteRequest, + TableRef, TableStats, UnsupportedMethod, WriteRequest, }, + MEMORY_ENGINE_TYPE, }; type RowGroupVec = Vec; @@ -250,3 +254,42 @@ fn build_column_block<'a, I: Iterator>( } Ok(builder.build()) } + +/// Memory table engine implementation +// Mainly for test purpose now +pub struct MemoryTableEngine; + +#[async_trait] +impl TableEngine for MemoryTableEngine { + fn engine_type(&self) -> &str { + MEMORY_ENGINE_TYPE + } + + async fn close(&self) -> crate::engine::Result<()> { + Ok(()) + } + + async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result { + Ok(Arc::new(MemoryTable::new( + request.table_name, + request.table_id, + request.table_schema, + MEMORY_ENGINE_TYPE.to_string(), + ))) + } + + async fn drop_table(&self, _request: DropTableRequest) -> crate::engine::Result { + Ok(true) + } + + async fn open_table( + &self, + _request: OpenTableRequest, + ) -> crate::engine::Result> { + Ok(None) + } + + async fn close_table(&self, _request: CloseTableRequest) -> crate::engine::Result<()> { + Ok(()) + } +} diff --git a/server/src/table_engine.rs b/table_engine/src/proxy.rs similarity index 59% rename from server/src/table_engine.rs rename to table_engine/src/proxy.rs index 8c8cfaeef3..b235d79365 100644 --- a/server/src/table_engine.rs +++ b/table_engine/src/proxy.rs @@ -1,56 +1,19 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -//! Table engine implementation - -use std::sync::Arc; +//! Table engine proxy use async_trait::async_trait; -use table_engine::{ + +use crate::{ engine::{ - CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result, - TableEngine, TableEngineRef, UnknownEngineType, + CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, TableEngine, + TableEngineRef, UnknownEngineType, }, - memory::MemoryTable, + memory::MemoryTableEngine, table::TableRef, ANALYTIC_ENGINE_TYPE, MEMORY_ENGINE_TYPE, }; -/// Memory table engine implementation -// Mainly for test purpose now -pub struct MemoryTableEngine; - -#[async_trait] -impl TableEngine for MemoryTableEngine { - fn engine_type(&self) -> &str { - MEMORY_ENGINE_TYPE - } - - async fn close(&self) -> Result<()> { - Ok(()) - } - - async fn create_table(&self, request: CreateTableRequest) -> Result { - Ok(Arc::new(MemoryTable::new( - request.table_name, - request.table_id, - request.table_schema, - MEMORY_ENGINE_TYPE.to_string(), - ))) - } - - async fn drop_table(&self, _request: DropTableRequest) -> Result { - Ok(true) - } - - async fn open_table(&self, _request: OpenTableRequest) -> Result> { - Ok(None) - } - - async fn close_table(&self, _request: CloseTableRequest) -> Result<()> { - Ok(()) - } -} - /// Route [CreateTableRequest] to the correct engine by its engine type pub struct TableEngineProxy { /// Memory table engine @@ -65,14 +28,14 @@ impl TableEngine for TableEngineProxy { "TableEngineProxy" } - async fn close(&self) -> Result<()> { + async fn close(&self) -> crate::engine::Result<()> { self.memory.close().await?; self.analytic.close().await?; Ok(()) } - async fn create_table(&self, request: CreateTableRequest) -> Result { + async fn create_table(&self, request: CreateTableRequest) -> crate::engine::Result { // TODO(yingwen): Use a map match request.engine.as_str() { MEMORY_ENGINE_TYPE => self.memory.create_table(request).await, @@ -81,7 +44,7 @@ impl TableEngine for TableEngineProxy { } } - async fn drop_table(&self, request: DropTableRequest) -> Result { + async fn drop_table(&self, request: DropTableRequest) -> crate::engine::Result { match request.engine.as_str() { MEMORY_ENGINE_TYPE => self.memory.drop_table(request).await, ANALYTIC_ENGINE_TYPE => self.analytic.drop_table(request).await, @@ -90,7 +53,10 @@ impl TableEngine for TableEngineProxy { } /// Open table, return error if table not exists - async fn open_table(&self, request: OpenTableRequest) -> Result> { + async fn open_table( + &self, + request: OpenTableRequest, + ) -> crate::engine::Result> { match request.engine.as_str() { MEMORY_ENGINE_TYPE => self.memory.open_table(request).await, ANALYTIC_ENGINE_TYPE => self.analytic.open_table(request).await, @@ -99,7 +65,7 @@ impl TableEngine for TableEngineProxy { } /// Close table, it is ok to close a closed table. - async fn close_table(&self, request: CloseTableRequest) -> Result<()> { + async fn close_table(&self, request: CloseTableRequest) -> crate::engine::Result<()> { match request.engine.as_str() { MEMORY_ENGINE_TYPE => self.memory.close_table(request).await, ANALYTIC_ENGINE_TYPE => self.analytic.close_table(request).await, From 89fe8f8ff2e1141bfd3e05d2925bf397b6512c6d Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 22 Mar 2023 19:01:09 +0800 Subject: [PATCH 2/2] address CR. --- Cargo.lock | 1 - catalog_impls/Cargo.toml | 1 - interpreters/Cargo.toml | 1 + server/src/handlers/influxdb.rs | 2 -- 4 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e963e3eb6..63aa41e8a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -985,7 +985,6 @@ dependencies = [ "common_util", "log", "meta_client", - "server", "snafu 0.6.10", "system_catalog", "table_engine", diff --git a/catalog_impls/Cargo.toml b/catalog_impls/Cargo.toml index 368f583012..156d696a29 100644 --- a/catalog_impls/Cargo.toml +++ b/catalog_impls/Cargo.toml @@ -25,4 +25,3 @@ tokio = { workspace = true } [dev-dependencies] analytic_engine = { workspace = true, features = ["test"] } -server = { workspace = true } diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml index fca00b79f2..e900e98da5 100644 --- a/interpreters/Cargo.toml +++ b/interpreters/Cargo.toml @@ -29,6 +29,7 @@ regex = "1" snafu = { workspace = true } sql = { workspace = true } table_engine = { workspace = true } + [dev-dependencies] analytic_engine = { workspace = true, features = ["test"] } catalog_impls = { workspace = true } diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs index 4035dbda1c..8a057a2e5c 100644 --- a/server/src/handlers/influxdb.rs +++ b/server/src/handlers/influxdb.rs @@ -226,8 +226,6 @@ fn convert_influx_value(field_value: FieldValue) -> Value { Value { value: Some(v) } } -// fn convert_query_result(output: Output) - // TODO: Request and response type don't match influxdb's API now. pub async fn query( ctx: RequestContext,