From 3b21fd1d28e45300a07f4fc2c85c3fc4f87d8ded Mon Sep 17 00:00:00 2001 From: kamille <34352236+Rachelint@users.noreply.github.com> Date: Wed, 28 Dec 2022 17:04:54 +0800 Subject: [PATCH] feat: impl remote sdk (#509) * move router from server to a dependent crate. * move avro util to common util. * impl remote sdk. * modfiy global Cargo.toml and Cargo.lock. * rename some variable. * address CR. --- Cargo.lock | 39 +++- Cargo.toml | 6 + cluster/Cargo.toml | 1 - common_types/src/projected_schema.rs | 18 ++ common_types/src/request_id.rs | 5 + common_util/src/avro.rs | 134 +++++++++++- components/object_store/Cargo.toml | 2 +- remote_engine_client/Cargo.toml | 25 +++ remote_engine_client/src/channel.rs | 96 +++++++++ remote_engine_client/src/client.rs | 202 ++++++++++++++++++ remote_engine_client/src/config.rs | 27 +++ remote_engine_client/src/lib.rs | 174 +++++++++++++++ remote_engine_client/src/status_code.rs | 29 +++ router/Cargo.toml | 24 +++ .../src/route => router/src}/cluster_based.rs | 3 +- router/src/endpoint.rs | 67 ++++++ {server/src/route => router/src}/hash.rs | 0 server/src/route/mod.rs => router/src/lib.rs | 11 +- .../src/route => router/src}/rule_based.rs | 5 +- server/Cargo.toml | 3 +- server/src/config.rs | 73 +------ server/src/grpc/forward.rs | 11 +- server/src/grpc/mod.rs | 3 +- server/src/grpc/storage_service/error.rs | 14 +- server/src/grpc/storage_service/mod.rs | 2 +- server/src/grpc/storage_service/query.rs | 14 +- server/src/http.rs | 2 +- server/src/lib.rs | 1 - server/src/server.rs | 4 +- src/setup.rs | 8 +- table_engine/src/predicate.rs | 31 ++- table_engine/src/remote/mock_impl.rs | 19 -- table_engine/src/remote/mod.rs | 39 +--- table_engine/src/remote/model.rs | 138 ++++++++++-- table_engine/src/table.rs | 39 +++- 35 files changed, 1085 insertions(+), 184 deletions(-) create mode 100644 remote_engine_client/Cargo.toml create mode 100644 remote_engine_client/src/channel.rs create mode 100644 remote_engine_client/src/client.rs create mode 100644 remote_engine_client/src/config.rs create mode 100644 remote_engine_client/src/lib.rs create mode 100644 remote_engine_client/src/status_code.rs create mode 100644 router/Cargo.toml rename {server/src/route => router/src}/cluster_based.rs (98%) create mode 100644 router/src/endpoint.rs rename {server/src/route => router/src}/hash.rs (100%) rename server/src/route/mod.rs => router/src/lib.rs (97%) rename {server/src/route => router/src}/rule_based.rs (98%) delete mode 100644 table_engine/src/remote/mock_impl.rs diff --git a/Cargo.lock b/Cargo.lock index aa6026c0d6..4043ae2865 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -832,6 +832,7 @@ dependencies = [ "logger", "meta_client", "query_engine", + "router", "server", "signal-hook", "sort", @@ -1017,7 +1018,6 @@ checksum = "b8191fa7302e03607ff0e237d4246cc043ff5b3cb9409d995172ba3bea16b807" name = "cluster" version = "1.0.0-alpha01" dependencies = [ - "analytic_engine", "async-trait", "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", "common_types 1.0.0-alpha01", @@ -4815,6 +4815,24 @@ version = "0.6.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +[[package]] +name = "remote_engine_client" +version = "1.0.0-alpha01" +dependencies = [ + "async-trait", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "clru", + "common_types 1.0.0-alpha01", + "common_util", + "futures 0.3.21", + "proto 1.0.0-alpha01", + "router", + "snafu 0.6.10", + "table_engine", + "tokio 1.22.0", + "tonic", +] + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -4929,6 +4947,23 @@ dependencies = [ "librocksdb_sys", ] +[[package]] +name = "router" +version = "1.0.0-alpha01" +dependencies = [ + "async-trait", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", + "cluster", + "common_types 1.0.0-alpha01", + "common_util", + "log", + "meta_client", + "serde", + "serde_derive", + "snafu 0.6.10", + "twox-hash", +] + [[package]] name = "rskafka" version = "0.3.0" @@ -5266,6 +5301,7 @@ dependencies = [ "prost", "proto 1.0.0-alpha01", "query_engine", + "router", "serde", "serde_derive", "serde_json", @@ -5276,7 +5312,6 @@ dependencies = [ "tokio 1.22.0", "tokio-stream", "tonic", - "twox-hash", "warp", ] diff --git a/Cargo.toml b/Cargo.toml index 9bddd8aa2e..7d70bcdcb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,8 @@ members = [ "meta_client", "proto", "query_engine", + "remote_engine_client", + "router", "server", "sql", "system_catalog", @@ -61,6 +63,7 @@ catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } chrono = "0.4" clap = "3.0" +clru = "0.6.1" cluster = { path = "cluster" } criterion = "0.3" common_types = { path = "common_types" } @@ -86,6 +89,8 @@ proto = { path = "proto" } prost = "0.11" query_engine = { path = "query_engine" } rand = "0.7" +remote_engine_client = { path = "remote_engine_client" } +router = { path = "router" } snafu = { version = "0.6.10", features = ["backtraces"] } serde = "1.0" serde_derive = "1.0" @@ -133,6 +138,7 @@ log = { workspace = true } logger = { workspace = true } meta_client = { workspace = true } query_engine = { workspace = true } +router = { workspace = true } server = { workspace = true } signal-hook = "0.3" sort = "0.8.5" diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index e68c392a44..9c5b3e0cdc 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -11,7 +11,6 @@ workspace = true workspace = true [dependencies] -analytic_engine = { workspace = true } async-trait = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index b76822fb42..60344bab0d 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -155,6 +155,24 @@ impl ProjectedSchema { } } +impl From for proto::common::ProjectedSchema { + fn from(request: ProjectedSchema) -> Self { + let table_schema_pb = (&request.0.original_schema).into(); + let projection_pb = request.0.projection.as_ref().map(|project| { + let project = project + .iter() + .map(|one_project| *one_project as u64) + .collect::>(); + proto::common::Projection { idx: project } + }); + + Self { + table_schema: Some(table_schema_pb), + projection: projection_pb, + } + } +} + impl TryFrom for ProjectedSchema { type Error = Error; diff --git a/common_types/src/request_id.rs b/common_types/src/request_id.rs index 6990839818..3f2adf01a8 100644 --- a/common_types/src/request_id.rs +++ b/common_types/src/request_id.rs @@ -19,6 +19,11 @@ impl RequestId { Self(id) } + + #[inline] + pub fn as_u64(&self) -> u64 { + self.0 + } } impl fmt::Display for RequestId { diff --git a/common_util/src/avro.rs b/common_util/src/avro.rs index 187e8f965c..9c37c7fe07 100644 --- a/common_util/src/avro.rs +++ b/common_util/src/avro.rs @@ -11,7 +11,7 @@ use avro_rs::{ }; use common_types::{ bytes::{ByteVec, Bytes}, - column::ColumnBlock, + column::{ColumnBlock, ColumnBlockBuilder}, datum::{Datum, DatumKind}, record_batch::RecordBatch, row::{Row, RowGroup, RowGroupBuilder}, @@ -19,7 +19,7 @@ use common_types::{ string::StringBytes, time::Timestamp, }; -use snafu::{Backtrace, ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; /// Schema name of the record const RECORD_NAME: &str = "Result"; @@ -48,6 +48,43 @@ pub enum Error { ))] InvalidAvroRecord { value: Value, backtrace: Backtrace }, + #[snafu(display( + "Failed to convert avro rows to record batch, msg:{}, err:{}", + msg, + source + ))] + AvroRowsToRecordBatch { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to convert avro rows to row group, msg:{}, err:{}", + msg, + source + ))] + AvroRowsToRowGroup { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to convert row group to avro rows with no cause, msg:{}.\nBacktrace:\n{}", + msg, + backtrace + ))] + RowGroupToAvroRowsNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to convert row group to avro rows with cause, msg:{}, err:{}", + msg, + source + ))] + RowGroupToAvroRowsWithCause { + msg: String, + source: Box, + }, + #[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))] UnsupportedType { value: Value, backtrace: Backtrace }, } @@ -113,6 +150,58 @@ pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result>, + record_schema: RecordSchema, +) -> Result { + let avro_schema = to_avro_schema(RECORD_NAME, &record_schema); + + // Collect datums and append to `ColumnBlockBuilder`s. + let mut row_buf = Vec::with_capacity(record_schema.num_columns()); + let mut column_block_builders = record_schema + .columns() + .iter() + .map(|col_schema| ColumnBlockBuilder::new(&col_schema.data_type)) + .collect::>(); + + for raw in raws { + row_buf.clear(); + avro_row_to_row(&avro_schema, &raw, &mut row_buf) + .map_err(|e| Box::new(e) as _) + .context(AvroRowsToRecordBatch { + msg: format!( + "parse avro raw to row failed, avro schema:{:?}, raw:{:?}", + avro_schema, raw + ), + })?; + assert_eq!(row_buf.len(), column_block_builders.len()); + + for (col_idx, datum) in row_buf.iter().enumerate() { + let column_block_builder = column_block_builders.get_mut(col_idx).unwrap(); + column_block_builder + .append(datum.clone()) + .map_err(|e| Box::new(e) as _) + .context(AvroRowsToRecordBatch { + msg: format!( + "append datum to column block builder failed, datum:{:?}, builder:{:?}", + datum, column_block_builder + ), + })? + } + } + + // Build `RecordBatch`. + let column_blocks = column_block_builders + .into_iter() + .map(|mut builder| builder.build()) + .collect::>(); + RecordBatch::new(record_schema, column_blocks) + .map_err(|e| Box::new(e) as _) + .context(AvroRowsToRecordBatch { + msg: "build record batch failed", + }) +} + pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec]) -> Result { let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema()); let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len()); @@ -125,6 +214,43 @@ pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec]) -> Result Result>> { + let record_schema = row_group.schema().to_record_schema(); + let column_schemas = record_schema.columns(); + let avro_schema = to_avro_schema(RECORD_NAME, &record_schema); + + let mut rows = Vec::with_capacity(row_group.num_rows()); + let row_len = row_group.num_rows(); + for row_idx in 0..row_len { + // Convert `Row` to `Record` in avro. + let row = row_group.get_row(row_idx).unwrap(); + let mut avro_record = Record::new(&avro_schema).context(RowGroupToAvroRowsNoCause { + msg: format!( + "new avro record with schema failed, schema:{:?}", + avro_schema + ), + })?; + + for (col_idx, column_schema) in column_schemas.iter().enumerate() { + let column_value = row[col_idx].clone(); + let avro_value = datum_to_avro_value(column_value, column_schema.is_nullable); + avro_record.put(&column_schema.name, avro_value); + } + + let row_bytes = avro_rs::to_avro_datum(&avro_schema, avro_record) + .map_err(|e| Box::new(e) as _) + .context(RowGroupToAvroRowsWithCause { + msg: format!( + "new avro record with schema failed, schema:{:?}", + avro_schema + ), + })?; + rows.push(row_bytes); + } + + Ok(rows) +} + fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema { match data_type { DatumKind::Null => avro_rs::Schema::Null, @@ -178,6 +304,10 @@ fn record_batch_to_avro( /// Panic if row_idx is out of bound. fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Value { let datum = array.datum(row_idx); + datum_to_avro_value(datum, is_nullable) +} + +pub fn datum_to_avro_value(datum: Datum, is_nullable: bool) -> Value { match datum { Datum::Null => may_union(Value::Null, is_nullable), Datum::Timestamp(v) => may_union(Value::TimestampMillis(v.as_i64()), is_nullable), diff --git a/components/object_store/Cargo.toml b/components/object_store/Cargo.toml index 1b5cf2c977..7736444a77 100644 --- a/components/object_store/Cargo.toml +++ b/components/object_store/Cargo.toml @@ -14,7 +14,7 @@ workspace = true async-trait = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } -clru = "0.6.1" +clru = { workspace = true } common_util = { workspace = true } crc = "3.0.0" futures = { workspace = true } diff --git a/remote_engine_client/Cargo.toml b/remote_engine_client/Cargo.toml new file mode 100644 index 0000000000..65ab55936b --- /dev/null +++ b/remote_engine_client/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "remote_engine_client" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +async-trait = { workspace = true } +ceresdbproto = { workspace = true } +clru = { workspace = true } +common_types = { workspace = true } +common_util = { workspace = true } +futures = { workspace = true } +proto = { workspace = true } +router = { workspace = true } +snafu = { workspace = true } +table_engine = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } diff --git a/remote_engine_client/src/channel.rs b/remote_engine_client/src/channel.rs new file mode 100644 index 0000000000..1f3836d088 --- /dev/null +++ b/remote_engine_client/src/channel.rs @@ -0,0 +1,96 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Channel pool + +use std::num::NonZeroUsize; + +use clru::CLruCache; +use router::endpoint::Endpoint; +use snafu::ResultExt; +use tokio::sync::Mutex; +use tonic::transport::{Channel, Endpoint as TonicEndpoint}; + +use super::config::Config; +use crate::error::*; + +/// Pool for reusing the built channel +pub struct ChannelPool { + /// Channels in pool + // TODO: should be replaced with a cache(like "moka") + // or partition the lock. + channels: Mutex>, + + /// Channel builder + builder: ChannelBuilder, +} + +impl ChannelPool { + pub fn new(config: Config) -> Self { + let channels = Mutex::new(CLruCache::new( + NonZeroUsize::new(config.channel_pool_max_size).unwrap(), + )); + let builder = ChannelBuilder::new(config); + + Self { channels, builder } + } + + pub async fn get(&self, endpoint: &Endpoint) -> Result { + { + let mut inner = self.channels.lock().await; + if let Some(channel) = inner.get(endpoint) { + return Ok(channel.clone()); + } + } + + let mut inner = self.channels.lock().await; + // Double check here. + if let Some(channel) = inner.get(endpoint) { + return Ok(channel.clone()); + } + + let channel = self + .builder + .build(endpoint.clone().to_string().as_str()) + .await?; + inner.put(endpoint.clone(), channel.clone()); + + Ok(channel) + } +} + +/// Channel builder +struct ChannelBuilder { + config: Config, +} + +impl ChannelBuilder { + fn new(config: Config) -> Self { + Self { config } + } + + async fn build(&self, endpoint: &str) -> Result { + let formatted_endpoint = make_formatted_endpoint(endpoint); + let configured_endpoint = + TonicEndpoint::from_shared(formatted_endpoint.clone()).context(BuildChannel { + addr: formatted_endpoint.clone(), + msg: "invalid endpoint", + })?; + + let configured_endpoint = configured_endpoint + .connect_timeout(self.config.connect_timeout.0) + .keep_alive_timeout(self.config.channel_keep_alive_timeout.0) + .http2_keep_alive_interval(self.config.channel_keep_alive_interval.0) + .keep_alive_while_idle(true); + + let channel = configured_endpoint.connect().await.context(BuildChannel { + addr: formatted_endpoint.clone(), + msg: "connect failed", + })?; + + Ok(channel) + } +} + +fn make_formatted_endpoint(endpoint: &str) -> String { + format!("http://{}", endpoint) +} diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs new file mode 100644 index 0000000000..c3f66e7dbd --- /dev/null +++ b/remote_engine_client/src/client.rs @@ -0,0 +1,202 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Client for accessing remote table engine + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use ceresdbproto::storage; +use common_types::{ + projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema, +}; +use common_util::avro; +use futures::{Stream, StreamExt}; +use proto::remote_engine::{self, remote_engine_service_client::*}; +use router::{endpoint::Endpoint, RouterRef}; +use snafu::{OptionExt, ResultExt}; +use table_engine::remote::model::{ReadRequest, TableIdentifier, WriteRequest}; +use tonic::{transport::Channel, Request, Streaming}; + +use crate::{channel::ChannelPool, config::Config, error::*, status_code}; + +pub struct Client { + channel_pool: ChannelPool, + router: RouterRef, +} + +impl Client { + pub fn new(config: Config, router: RouterRef) -> Self { + let channel_pool = ChannelPool::new(config); + + Self { + channel_pool, + router, + } + } + + pub async fn read(&self, request: ReadRequest) -> Result { + // Find the endpoint from router firstly. + let endpoint = self.route(&request.table).await?; + + // Read from remote. + let table_ident = request.table.clone(); + let projected_schema = request.read_request.projected_schema.clone(); + + let channel = self.channel_pool.get(&endpoint).await?; + let mut rpc_client = RemoteEngineServiceClient::::new(channel); + let request_pb = proto::remote_engine::ReadRequest::try_from(request) + .map_err(|e| Box::new(e) as _) + .context(ConvertReadRequest { + msg: "convert to pb failed", + })?; + + let result = rpc_client + .read(Request::new(request_pb)) + .await + .context(Rpc { + table_ident: table_ident.clone(), + msg: format!("read from remote failed, endpoint:{:?}", endpoint), + })?; + + let response = result.into_inner(); + let remote_read_record_batch_stream = + ClientReadRecordBatchStream::new(table_ident, response, projected_schema); + + Ok(remote_read_record_batch_stream) + } + + pub async fn write(&self, request: WriteRequest) -> Result { + // Find the endpoint from router firstly. + let endpoint = self.route(&request.table).await?; + + // Write to remote. + let table_ident = request.table.clone(); + + let channel = self.channel_pool.get(&endpoint).await?; + let request_pb = proto::remote_engine::WriteRequest::try_from(request) + .map_err(|e| Box::new(e) as _) + .context(ConvertWriteRequest { + msg: "convert to pb failed", + })?; + let mut rpc_client = RemoteEngineServiceClient::::new(channel); + + let result = rpc_client + .write(Request::new(request_pb)) + .await + .context(Rpc { + table_ident: table_ident.clone(), + msg: format!("write to remote failed, endpoint:{:?}", endpoint), + })?; + + let response = result.into_inner(); + if let Some(header) = response.header && !status_code::is_ok(header.code) { + Server { + table_ident: table_ident.clone(), + code: header.code, + msg: header.error, + }.fail() + } else { + Ok(response.affected_rows as usize) + } + } + + async fn route(&self, table_ident: &TableIdentifier) -> Result { + let schema = &table_ident.schema; + let table = table_ident.table.clone(); + let route_request = storage::RouteRequest { + metrics: vec![table], + }; + let route_infos = + self.router + .route(schema, route_request) + .await + .context(RouteWithCause { + table_ident: table_ident.clone(), + })?; + + if route_infos.is_empty() { + return RouteNoCause { + table_ident: table_ident.clone(), + msg: "route infos is empty", + } + .fail(); + } + + // Get channel from pool. + let endpoint = route_infos + .first() + .unwrap() + .endpoint + .clone() + .context(RouteNoCause { + table_ident: table_ident.clone(), + msg: "no endpoint in route info", + })?; + + Ok(Endpoint::from(endpoint)) + } +} + +pub struct ClientReadRecordBatchStream { + pub table_ident: TableIdentifier, + pub response_stream: Streaming, + pub projected_schema: ProjectedSchema, + pub projected_record_schema: RecordSchema, +} + +impl ClientReadRecordBatchStream { + pub fn new( + table_ident: TableIdentifier, + response_stream: Streaming, + projected_schema: ProjectedSchema, + ) -> Self { + let projected_record_schema = projected_schema.to_record_schema(); + Self { + table_ident, + response_stream, + projected_schema, + projected_record_schema, + } + } +} + +impl Stream for ClientReadRecordBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + match this.response_stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(response))) => { + // Check header. + if let Some(header) = response.header && !status_code::is_ok(header.code) { + return Poll::Ready(Some(Server { + table_ident: this.table_ident.clone(), + code: header.code, + msg: header.error, + }.fail())); + } + + // It's ok, try to convert rows to record batch and return. + let record_schema = this.projected_schema.to_record_schema(); + let record_batch_result = + avro::avro_rows_to_record_batch(response.rows, record_schema) + .map_err(|e| Box::new(e) as _) + .context(ConvertReadResponse { + msg: "build record batch failed", + }); + Poll::Ready(Some(record_batch_result)) + } + + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e).context(Rpc { + table_ident: this.table_ident.clone(), + msg: "poll read response failed", + }))), + + Poll::Ready(None) => Poll::Ready(None), + + Poll::Pending => Poll::Pending, + } + } +} diff --git a/remote_engine_client/src/config.rs b/remote_engine_client/src/config.rs new file mode 100644 index 0000000000..e87f8fd652 --- /dev/null +++ b/remote_engine_client/src/config.rs @@ -0,0 +1,27 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Config for [Client] + +use std::str::FromStr; + +use common_util::config::ReadableDuration; + +pub struct Config { + pub connect_timeout: ReadableDuration, + pub channel_pool_max_size: usize, + pub channel_keep_alive_while_idle: bool, + pub channel_keep_alive_timeout: ReadableDuration, + pub channel_keep_alive_interval: ReadableDuration, +} + +impl Default for Config { + fn default() -> Self { + Self { + connect_timeout: ReadableDuration::from_str("3s").unwrap(), + channel_pool_max_size: 128, + channel_keep_alive_interval: ReadableDuration::from_str("600s").unwrap(), + channel_keep_alive_timeout: ReadableDuration::from_str("3s").unwrap(), + channel_keep_alive_while_idle: true, + } + } +} diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs new file mode 100644 index 0000000000..a90435218e --- /dev/null +++ b/remote_engine_client/src/lib.rs @@ -0,0 +1,174 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Remote table engine implementation + +mod channel; +mod client; +pub mod config; +mod status_code; + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use async_trait::async_trait; +use common_types::{record_batch::RecordBatch, schema::RecordSchema}; +use config::Config; +use futures::{Stream, StreamExt}; +use router::RouterRef; +use snafu::ResultExt; +use table_engine::{ + remote::{ + self, + model::{ReadRequest, WriteRequest}, + RemoteEngine, + }, + stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream}, +}; + +use self::client::{Client, ClientReadRecordBatchStream}; + +pub mod error { + use common_util::define_result; + use snafu::Snafu; + use table_engine::remote::model::TableIdentifier; + + #[derive(Debug, Snafu)] + #[snafu(visibility(pub))] + pub enum Error { + #[snafu(display("Failed to connect, addr:{}, msg:{}, err:{}", addr, msg, source))] + BuildChannel { + addr: String, + msg: String, + source: tonic::transport::Error, + }, + + #[snafu(display( + "Failed to convert request or response, table, msg:{}, err:{}", + msg, + source + ))] + ConvertReadRequest { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to convert request or response, table, msg:{}, err:{}", + msg, + source + ))] + ConvertReadResponse { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to convert request or response, table, msg:{}, err:{}", + msg, + source + ))] + ConvertWriteRequest { + msg: String, + source: Box, + }, + + #[snafu(display( + "Failed to connect, table_ident:{:?}, msg:{}, err:{}", + table_ident, + msg, + source + ))] + Rpc { + table_ident: TableIdentifier, + msg: String, + source: tonic::Status, + }, + + #[snafu(display( + "Failed to query from table in server, table_ident:{:?}, code:{}, msg:{}", + table_ident, + code, + msg + ))] + Server { + table_ident: TableIdentifier, + code: u32, + msg: String, + }, + + #[snafu(display("Failed to route table, table_ident:{:?}, err:{}", table_ident, source,))] + RouteWithCause { + table_ident: TableIdentifier, + source: router::Error, + }, + + #[snafu(display("Failed to route table, table_ident:{:?}, msg:{}", table_ident, msg,))] + RouteNoCause { + table_ident: TableIdentifier, + msg: String, + }, + } + + define_result!(Error); +} + +pub struct RemoteEngineImpl(Client); + +impl RemoteEngineImpl { + pub fn new(config: Config, router: RouterRef) -> Self { + let client = Client::new(config, router); + + Self(client) + } +} + +#[async_trait] +impl RemoteEngine for RemoteEngineImpl { + async fn read(&self, request: ReadRequest) -> remote::Result { + let client_read_stream = self + .0 + .read(request) + .await + .map_err(|e| Box::new(e) as _) + .context(remote::Read)?; + Ok(Box::pin(RemoteReadRecordBatchStream(client_read_stream))) + } + + async fn write(&self, request: WriteRequest) -> remote::Result { + self.0 + .write(request) + .await + .map_err(|e| Box::new(e) as _) + .context(remote::Write) + } +} + +struct RemoteReadRecordBatchStream(ClientReadRecordBatchStream); + +impl Stream for RemoteReadRecordBatchStream { + type Item = stream::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + match this.0.poll_next_unpin(cx) { + Poll::Ready(Some(result)) => { + let result = result.map_err(|e| Box::new(e) as _).context(ErrWithSource { + msg: "poll read response failed", + }); + + Poll::Ready(Some(result)) + } + + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for RemoteReadRecordBatchStream { + fn schema(&self) -> &RecordSchema { + &self.0.projected_record_schema + } +} diff --git a/remote_engine_client/src/status_code.rs b/remote_engine_client/src/status_code.rs new file mode 100644 index 0000000000..3fd46332c1 --- /dev/null +++ b/remote_engine_client/src/status_code.rs @@ -0,0 +1,29 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Remote service status code + +/// A set of codes for meta event service. +/// +/// Note that such a set of codes is different with the codes (alias to http +/// status code) used by storage service. +#[allow(dead_code)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatusCode { + #[default] + Ok = 0, + BadRequest = 401, + NotFound = 404, + Internal = 500, +} + +impl StatusCode { + #[inline] + pub fn as_u32(self) -> u32 { + self as u32 + } +} + +#[inline] +pub fn is_ok(code: u32) -> bool { + code == StatusCode::Ok.as_u32() +} diff --git a/router/Cargo.toml b/router/Cargo.toml new file mode 100644 index 0000000000..1ca818ed68 --- /dev/null +++ b/router/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "router" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +async-trait = { workspace = true } +ceresdbproto = { workspace = true } +cluster = { workspace = true } +common_types = { workspace = true } +common_util = { workspace = true } +log = { workspace = true } +meta_client = { workspace = true } +serde = { workspace = true } +serde_derive = { workspace = true } +snafu = { workspace = true } +twox-hash = "1.6" diff --git a/server/src/route/cluster_based.rs b/router/src/cluster_based.rs similarity index 98% rename from server/src/route/cluster_based.rs rename to router/src/cluster_based.rs index 40d43bd23e..4d68c2c436 100644 --- a/server/src/route/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -11,8 +11,7 @@ use meta_client::types::{NodeShard, RouteTablesRequest, RouteTablesResponse}; use snafu::{OptionExt, ResultExt}; use crate::{ - config::Endpoint, - route::{hash, OtherNoCause, OtherWithCause, ParseEndpoint, Result, Router}, + endpoint::Endpoint, hash, OtherNoCause, OtherWithCause, ParseEndpoint, Result, Router, }; pub struct ClusterBasedRouter { diff --git a/router/src/endpoint.rs b/router/src/endpoint.rs new file mode 100644 index 0000000000..a7bbe7e1e5 --- /dev/null +++ b/router/src/endpoint.rs @@ -0,0 +1,67 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Endpoint definition + +use std::str::FromStr; + +use ceresdbproto::storage; +use serde_derive::Deserialize; + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)] +pub struct Endpoint { + pub addr: String, + pub port: u16, +} + +impl Endpoint { + pub fn new(addr: String, port: u16) -> Self { + Self { addr, port } + } +} + +impl ToString for Endpoint { + fn to_string(&self) -> String { + format!("{}:{}", self.addr, self.port) + } +} + +impl FromStr for Endpoint { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + let (addr, raw_port) = match s.rsplit_once(':') { + Some(v) => v, + None => { + let err_msg = "Can't find ':' in the source string".to_string(); + return Err(Self::Err::from(err_msg)); + } + }; + let port = raw_port.parse().map_err(|e| { + let err_msg = format!("Fail to parse port:{}, err:{}", raw_port, e); + Self::Err::from(err_msg) + })?; + + Ok(Endpoint { + addr: addr.to_string(), + port, + }) + } +} + +impl From for storage::Endpoint { + fn from(endpoint: Endpoint) -> Self { + storage::Endpoint { + ip: endpoint.addr, + port: endpoint.port as u32, + } + } +} + +impl From for Endpoint { + fn from(endpoint_pb: storage::Endpoint) -> Self { + Endpoint { + addr: endpoint_pb.ip, + port: endpoint_pb.port as u16, + } + } +} diff --git a/server/src/route/hash.rs b/router/src/hash.rs similarity index 100% rename from server/src/route/hash.rs rename to router/src/hash.rs diff --git a/server/src/route/mod.rs b/router/src/lib.rs similarity index 97% rename from server/src/route/mod.rs rename to router/src/lib.rs index 4da8b1bb2d..cf47990e22 100644 --- a/server/src/route/mod.rs +++ b/router/src/lib.rs @@ -1,15 +1,16 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::Arc; - -use async_trait::async_trait; -use ceresdbproto::storage::{Route, RouteRequest}; - pub mod cluster_based; +pub mod endpoint; pub(crate) mod hash; pub mod rule_based; +use std::sync::Arc; + +use async_trait::async_trait; +use ceresdbproto::storage::{Route, RouteRequest}; pub use cluster_based::ClusterBasedRouter; +use common_util::define_result; pub use rule_based::{RuleBasedRouter, RuleList}; use snafu::{Backtrace, Snafu}; diff --git a/server/src/route/rule_based.rs b/router/src/rule_based.rs similarity index 98% rename from server/src/route/rule_based.rs rename to router/src/rule_based.rs index b67f98417b..8adce22cd5 100644 --- a/server/src/route/rule_based.rs +++ b/router/src/rule_based.rs @@ -12,10 +12,7 @@ use meta_client::types::ShardId; use serde_derive::Deserialize; use snafu::{ensure, OptionExt}; -use crate::{ - config::Endpoint, - route::{hash, Result, RouteNotFound, Router, ShardNotFound}, -}; +use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound}; pub type ShardNodes = HashMap; diff --git a/server/Cargo.toml b/server/Cargo.toml index b624c2c07f..595b444e29 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -37,6 +37,7 @@ prometheus-static-metric = { workspace = true } prost = { workspace = true } proto = { workspace = true } query_engine = { workspace = true } +router = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } @@ -47,8 +48,6 @@ table_engine = { workspace = true } tokio = { workspace = true } tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } -twox-hash = "1.6" warp = "0.3" - [dev-dependencies] sql = { workspace = true, features = ["test"] } diff --git a/server/src/config.rs b/server/src/config.rs index 554559c680..e6fe61621b 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -2,22 +2,20 @@ //! Server configs -use std::{collections::HashMap, str::FromStr}; +use std::collections::HashMap; use analytic_engine; -use ceresdbproto::storage; use cluster::config::{ClusterConfig, SchemaConfig}; use common_types::schema::TIMESTAMP_COLUMN; use meta_client::types::ShardId; +use router::{ + endpoint::Endpoint, + rule_based::{ClusterView, RuleList}, +}; use serde_derive::Deserialize; use table_engine::ANALYTIC_ENGINE_TYPE; -use crate::{ - grpc::forward, - http::DEFAULT_MAX_BODY_SIZE, - limiter::LimiterConfig, - route::rule_based::{ClusterView, RuleList}, -}; +use crate::{grpc::forward, http::DEFAULT_MAX_BODY_SIZE, limiter::LimiterConfig}; /// The deployment mode decides how to start the CeresDB. /// @@ -52,65 +50,6 @@ pub struct StaticRouteConfig { pub topology: StaticTopologyConfig, } -#[derive(Debug, Clone, Deserialize, Eq, Hash, PartialEq)] -pub struct Endpoint { - pub addr: String, - pub port: u16, -} - -impl Endpoint { - pub fn new(addr: String, port: u16) -> Self { - Self { addr, port } - } -} - -impl ToString for Endpoint { - fn to_string(&self) -> String { - format!("{}:{}", self.addr, self.port) - } -} - -impl FromStr for Endpoint { - type Err = Box; - - fn from_str(s: &str) -> std::result::Result { - let (addr, raw_port) = match s.rsplit_once(':') { - Some(v) => v, - None => { - let err_msg = "Can't find ':' in the source string".to_string(); - return Err(Self::Err::from(err_msg)); - } - }; - let port = raw_port.parse().map_err(|e| { - let err_msg = format!("Fail to parse port:{}, err:{}", raw_port, e); - Self::Err::from(err_msg) - })?; - - Ok(Endpoint { - addr: addr.to_string(), - port, - }) - } -} - -impl From for storage::Endpoint { - fn from(endpoint: Endpoint) -> Self { - storage::Endpoint { - ip: endpoint.addr, - port: endpoint.port as u32, - } - } -} - -impl From for Endpoint { - fn from(endpoint: storage::Endpoint) -> Self { - Endpoint { - addr: endpoint.ip, - port: endpoint.port as u16, - } - } -} - #[derive(Debug, Clone, Deserialize)] pub struct ShardView { pub shard_id: ShardId, diff --git a/server/src/grpc/forward.rs b/server/src/grpc/forward.rs index b78a5f1e94..e337ea1ccf 100644 --- a/server/src/grpc/forward.rs +++ b/server/src/grpc/forward.rs @@ -11,6 +11,7 @@ use std::{ use async_trait::async_trait; use ceresdbproto::storage::{storage_service_client::StorageServiceClient, RouteRequest}; use log::{debug, error, warn}; +use router::{endpoint::Endpoint, RouterRef}; use serde_derive::Deserialize; use snafu::{ensure, Backtrace, ResultExt, Snafu}; use tonic::{ @@ -18,7 +19,7 @@ use tonic::{ transport::{self, Channel}, }; -use crate::{config::Endpoint, consts::TENANT_HEADER, route::RouterRef}; +use crate::consts::TENANT_HEADER; #[derive(Debug, Snafu)] pub enum Error { @@ -365,10 +366,10 @@ impl Forwarder { mod tests { use ceresdbproto::storage::{QueryRequest, QueryResponse, Route}; use futures::FutureExt; + use router::Router; use tonic::IntoRequest; use super::*; - use crate::route::Router; #[test] fn test_check_loopback_endpoint() { @@ -396,11 +397,7 @@ mod tests { #[async_trait] impl Router for MockRouter { - async fn route( - &self, - _schema: &str, - req: RouteRequest, - ) -> crate::route::Result> { + async fn route(&self, _schema: &str, req: RouteRequest) -> router::Result> { let endpoint = self.routing_tables.get(&req.metrics[0]); match endpoint { None => Ok(vec![]), diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index b061d53c9d..8d58adcfe7 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -27,19 +27,18 @@ use futures::FutureExt; use log::{info, warn}; use proto::remote_engine::remote_engine_service_server::RemoteEngineServiceServer; use query_engine::executor::Executor as QueryExecutor; +use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Sender}; use tonic::transport::Server; use crate::{ - config::Endpoint, grpc::{ forward::Forwarder, meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl, storage_service::StorageServiceImpl, }, instance::InstanceRef, - route::RouterRef, schema_config_provider::{self, SchemaConfigProviderRef}, }; diff --git a/server/src/grpc/storage_service/error.rs b/server/src/grpc/storage_service/error.rs index b3f97895fd..a8531275f8 100644 --- a/server/src/grpc/storage_service/error.rs +++ b/server/src/grpc/storage_service/error.rs @@ -7,7 +7,7 @@ use common_util::define_result; use http::StatusCode; use snafu::Snafu; -use crate::{error_util, route}; +use crate::error_util; define_result!(Error); @@ -61,18 +61,18 @@ pub fn build_ok_header() -> ResponseHeader { } } -impl From for Error { - fn from(route_err: route::Error) -> Self { +impl From for Error { + fn from(route_err: router::Error) -> Self { match &route_err { - route::Error::RouteNotFound { .. } | route::Error::ShardNotFound { .. } => { + router::Error::RouteNotFound { .. } | router::Error::ShardNotFound { .. } => { Error::ErrNoCause { code: StatusCode::NOT_FOUND, msg: route_err.to_string(), } } - route::Error::ParseEndpoint { .. } - | route::Error::OtherWithCause { .. } - | route::Error::OtherNoCause { .. } => Error::ErrNoCause { + router::Error::ParseEndpoint { .. } + | router::Error::OtherWithCause { .. } + | router::Error::OtherNoCause { .. } => Error::ErrNoCause { code: StatusCode::INTERNAL_SERVER_ERROR, msg: route_err.to_string(), }, diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 298dd9baff..2097c68d68 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -29,6 +29,7 @@ use http::StatusCode; use log::{error, warn}; use paste::paste; use query_engine::executor::Executor as QueryExecutor; +use router::{Router, RouterRef}; use snafu::{ensure, OptionExt, ResultExt}; use sql::plan::CreateTablePlan; use table_engine::engine::EngineRuntimes; @@ -44,7 +45,6 @@ use crate::{ storage_service::error::{ErrNoCause, ErrWithCause, Result}, }, instance::InstanceRef, - route::{Router, RouterRef}, schema_config_provider::SchemaConfigProviderRef, }; diff --git a/server/src/grpc/storage_service/query.rs b/server/src/grpc/storage_service/query.rs index ade4bd181c..af29ae2991 100644 --- a/server/src/grpc/storage_service/query.rs +++ b/server/src/grpc/storage_service/query.rs @@ -17,6 +17,7 @@ use http::StatusCode; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; use log::{error, info, warn}; use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; +use router::endpoint::Endpoint; use snafu::{ensure, ResultExt}; use sql::{ frontend::{Context as SqlContext, Frontend}, @@ -24,14 +25,11 @@ use sql::{ }; use tonic::{transport::Channel, IntoRequest}; -use crate::{ - config::Endpoint, - grpc::{ - forward::{ForwardRequest, ForwardResult}, - storage_service::{ - error::{ErrNoCause, ErrWithCause, Result}, - HandlerContext, - }, +use crate::grpc::{ + forward::{ForwardRequest, ForwardResult}, + storage_service::{ + error::{ErrNoCause, ErrWithCause, Result}, + HandlerContext, }, }; diff --git a/server/src/http.rs b/server/src/http.rs index 2c283a2ef9..3aec942ac6 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -10,6 +10,7 @@ use log::error; use logger::RuntimeLevel; use profile::Profiler; use query_engine::executor::Executor as QueryExecutor; +use router::endpoint::Endpoint; use serde_derive::Serialize; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; @@ -23,7 +24,6 @@ use warp::{ }; use crate::{ - config::Endpoint, consts, context::RequestContext, error_util, diff --git a/server/src/lib.rs b/server/src/lib.rs index 1be4335f41..266ad813e9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -21,7 +21,6 @@ pub mod local_tables; pub mod logger; mod metrics; mod mysql; -pub mod route; pub mod schema_config_provider; pub mod server; pub mod table_engine; diff --git a/server/src/server.rs b/server/src/server.rs index c42339f0b5..4cd341f9af 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -11,11 +11,12 @@ use interpreters::table_manipulator::TableManipulatorRef; use log::{info, warn}; use logger::RuntimeLevel; use query_engine::executor::Executor as QueryExecutor; +use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; use crate::{ - config::{Config, Endpoint}, + config::Config, grpc::{self, RpcServices}, http::{self, HttpConfig, Service}, instance::{Instance, InstanceRef}, @@ -23,7 +24,6 @@ use crate::{ local_tables::{self, LocalTablesRecoverer}, mysql, mysql::error::Error as MysqlError, - route::RouterRef, schema_config_provider::SchemaConfigProviderRef, }; diff --git a/src/setup.rs b/src/setup.rs index 7f4788849d..234e66f319 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -19,14 +19,14 @@ use log::info; use logger::RuntimeLevel; use meta_client::meta_impl; use query_engine::executor::{Executor, ExecutorImpl}; +use router::{ + cluster_based::ClusterBasedRouter, + rule_based::{ClusterView, RuleBasedRouter}, +}; use server::{ config::{Config, DeployMode, RuntimeConfig, StaticTopologyConfig}, limiter::Limiter, local_tables::LocalTablesRecoverer, - route::{ - cluster_based::ClusterBasedRouter, - rule_based::{ClusterView, RuleBasedRouter}, - }, schema_config_provider::{ cluster_based::ClusterBasedProvider, config_based::ConfigBasedProvider, }, diff --git a/table_engine/src/predicate.rs b/table_engine/src/predicate.rs index 20955ab6c1..f17cf3377a 100644 --- a/table_engine/src/predicate.rs +++ b/table_engine/src/predicate.rs @@ -20,11 +20,17 @@ use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { - #[snafu(display("Failed ot do pruning, err:{}", source))] + #[snafu(display("Failed to do pruning, err:{}", source))] Prune { source: datafusion::error::DataFusionError, }, + #[snafu(display("Failed to convert predicate to pb, msg:{}, err:{}", msg, source))] + PredicateToPb { + msg: String, + source: datafusion::error::DataFusionError, + }, + #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))] EmptyTimeRange { backtrace: Backtrace }, @@ -81,6 +87,29 @@ impl Predicate { } } +impl TryFrom<&Predicate> for proto::remote_engine::Predicate { + type Error = Error; + + fn try_from(predicate: &Predicate) -> std::result::Result { + let time_range = predicate.time_range; + let mut exprs = Vec::with_capacity(predicate.exprs.len()); + for expr in &predicate.exprs { + let expr = expr + .to_bytes() + .context(PredicateToPb { + msg: format!("convert expr failed, expr:{}", expr), + })? + .to_vec(); + exprs.push(expr); + } + + Ok(Self { + exprs, + time_range: Some(time_range.into()), + }) + } +} + impl TryFrom for Predicate { type Error = Error; diff --git a/table_engine/src/remote/mock_impl.rs b/table_engine/src/remote/mock_impl.rs deleted file mode 100644 index 55bb821795..0000000000 --- a/table_engine/src/remote/mock_impl.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. - -//! Mock impl for remote table engine - -use async_trait::async_trait; -pub struct MockImpl; - -#[async_trait] -impl RemoteEngine for MockImpl { - /// Read from the remote engine - async fn read(&self, request: ReadRequest) -> Result { - todo!() - } - - /// Write to the remote engine - async fn write(&self, request: WriteRequest) -> Result { - todo!() - } -} diff --git a/table_engine/src/remote/mod.rs b/table_engine/src/remote/mod.rs index 502baaccb8..c540e3a29f 100644 --- a/table_engine/src/remote/mod.rs +++ b/table_engine/src/remote/mod.rs @@ -7,45 +7,22 @@ pub mod model; use async_trait::async_trait; use common_util::define_result; use model::{ReadRequest, WriteRequest}; -use snafu::{Backtrace, Snafu}; +use snafu::Snafu; use crate::stream::SendableRecordBatchStream; #[derive(Debug, Snafu)] +#[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Empty table identifier.\nBacktrace:\n{}", backtrace))] - EmptyTableIdentifier { backtrace: Backtrace }, - - #[snafu(display("Empty table read request.\nBacktrace:\n{}", backtrace))] - EmptyTableReadRequest { backtrace: Backtrace }, - - #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] - EmptyTableSchema { backtrace: Backtrace }, - - #[snafu(display("Empty row group.\nBacktrace:\n{}", backtrace))] - EmptyRowGroup { backtrace: Backtrace }, - - #[snafu(display("Failed to covert table read request, err:{}", source))] - ConvertTableReadRequest { + #[snafu(display("Failed to read from remote, err:{}", source))] + Read { source: Box, }, - #[snafu(display("Failed to covert table schema, err:{}", source))] - ConvertTableSchema { + #[snafu(display("Failed to write to remote, err:{}", source))] + Write { source: Box, }, - - #[snafu(display("Failed to covert row group, err:{}", source))] - ConvertRowGroup { - source: Box, - }, - - #[snafu(display( - "Failed to covert row group, encoding version:{}.\nBacktrace:\n{}", - version, - backtrace - ))] - UnsupportedConvertRowGroup { version: u32, backtrace: Backtrace }, } define_result!(Error); @@ -53,9 +30,9 @@ define_result!(Error); /// Remote table engine interface #[async_trait] pub trait RemoteEngine { - /// Read from the remote engine + /// Read from the remote engine. async fn read(&self, request: ReadRequest) -> Result; - /// Write to the remote engine + /// Write to the remote engine. async fn write(&self, request: WriteRequest) -> Result; } diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 74e130d562..2a042a4234 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -4,20 +4,77 @@ use common_types::schema::Schema; use common_util::avro; -use snafu::{OptionExt, ResultExt}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use crate::{ - remote::{ - ConvertRowGroup, ConvertTableReadRequest, ConvertTableSchema, EmptyRowGroup, - EmptyTableIdentifier, EmptyTableReadRequest, EmptyTableSchema, Error, - UnsupportedConvertRowGroup, +use crate::table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest}; + +const ENCODE_ROWS_WITH_AVRO: u32 = 1; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to convert read request to pb, err:{}", source))] + ReadRequestToPb { source: crate::table::Error }, + + #[snafu(display( + "Failed to convert write request to pb, table_ident:{:?}, msg:{}.\nBacktrace:\n{}", + table_ident, + msg, + backtrace + ))] + WriteRequestToPbNoCause { + table_ident: TableIdentifier, + msg: String, + backtrace: Backtrace, }, - table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest}, -}; -const ENCODE_ROWS_WITH_AVRO: u32 = 0; + #[snafu(display( + "Failed to convert write request to pb, table_ident:{:?}, err:{}", + table_ident, + source + ))] + WriteRequestToPbWithCause { + table_ident: TableIdentifier, + source: avro::Error, + }, + + #[snafu(display("Empty table identifier.\nBacktrace:\n{}", backtrace))] + EmptyTableIdentifier { backtrace: Backtrace }, + + #[snafu(display("Empty table read request.\nBacktrace:\n{}", backtrace))] + EmptyTableReadRequest { backtrace: Backtrace }, -#[derive(Debug)] + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] + EmptyTableSchema { backtrace: Backtrace }, + + #[snafu(display("Empty row group.\nBacktrace:\n{}", backtrace))] + EmptyRowGroup { backtrace: Backtrace }, + + #[snafu(display("Failed to covert table read request, err:{}", source))] + ConvertTableReadRequest { + source: Box, + }, + + #[snafu(display("Failed to covert table schema, err:{}", source))] + ConvertTableSchema { + source: Box, + }, + + #[snafu(display("Failed to covert row group, err:{}", source))] + ConvertRowGroup { + source: Box, + }, + + #[snafu(display( + "Failed to covert row group, encoding version:{}.\nBacktrace:\n{}", + version, + backtrace + ))] + UnsupportedConvertRowGroup { version: u32, backtrace: Backtrace }, +} + +define_result!(Error); + +#[derive(Debug, Clone)] pub struct TableIdentifier { pub catalog: String, pub schema: String, @@ -34,6 +91,16 @@ impl From for TableIdentifier { } } +impl From for proto::remote_engine::TableIdentifier { + fn from(table_ident: TableIdentifier) -> Self { + Self { + catalog: table_ident.catalog, + schema: table_ident.schema, + table: table_ident.table, + } + } +} + pub struct ReadRequest { pub table: TableIdentifier, pub read_request: TableReadRequest, @@ -42,7 +109,7 @@ pub struct ReadRequest { impl TryFrom for ReadRequest { type Error = Error; - fn try_from(pb: proto::remote_engine::ReadRequest) -> Result { + fn try_from(pb: proto::remote_engine::ReadRequest) -> std::result::Result { let table_identifier = pb.table.context(EmptyTableIdentifier)?; let table_read_request = pb.read_request.context(EmptyTableReadRequest)?; Ok(Self { @@ -55,7 +122,20 @@ impl TryFrom for ReadRequest { } } -#[allow(dead_code)] +impl TryFrom for proto::remote_engine::ReadRequest { + type Error = Error; + + fn try_from(request: ReadRequest) -> std::result::Result { + let table_pb = request.table.into(); + let request_pb = request.read_request.try_into().context(ReadRequestToPb)?; + + Ok(Self { + table: Some(table_pb), + read_request: Some(request_pb), + }) + } +} + pub struct WriteRequest { pub table: TableIdentifier, pub write_request: TableWriteRequest, @@ -64,7 +144,7 @@ pub struct WriteRequest { impl TryFrom for WriteRequest { type Error = Error; - fn try_from(pb: proto::remote_engine::WriteRequest) -> Result { + fn try_from(pb: proto::remote_engine::WriteRequest) -> std::result::Result { let table_identifier = pb.table.context(EmptyTableIdentifier)?; let row_group_pb = pb.row_group.context(EmptyRowGroup)?; let table_schema: Schema = row_group_pb @@ -89,3 +169,35 @@ impl TryFrom for WriteRequest { }) } } + +impl TryFrom for proto::remote_engine::WriteRequest { + type Error = Error; + + fn try_from(request: WriteRequest) -> std::result::Result { + // Row group to pb. + let row_group = request.write_request.row_group; + let table_schema_pb = row_group.schema().into(); + let min_timestamp = row_group.min_timestamp().as_i64(); + let max_timestamp = row_group.max_timestmap().as_i64(); + let avro_rows = + avro::row_group_to_avro_rows(row_group).context(WriteRequestToPbWithCause { + table_ident: request.table.clone(), + })?; + + let row_group_pb = proto::remote_engine::RowGroup { + version: ENCODE_ROWS_WITH_AVRO, + table_schema: Some(table_schema_pb), + rows: avro_rows, + min_timestamp, + max_timestamp, + }; + + // Table ident to pb. + let table_pb = request.table.into(); + + Ok(Self { + table: Some(table_pb), + row_group: Some(row_group_pb), + }) + } +} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 005352ba3c..6674238f2c 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -125,6 +125,12 @@ pub enum Error { source: Box, }, + #[snafu(display("Failed to convert read request to pb, msg:{}, err:{}", msg, source))] + ReadRequestToPb { + msg: String, + source: Box, + }, + #[snafu(display("Empty read options.\nBacktrace:\n{}", backtrace))] EmptyReadOptions { backtrace: Backtrace }, @@ -342,7 +348,7 @@ pub struct GetRequest { #[derive(Copy, Clone, Debug)] pub enum ReadOrder { /// No order requirements from the read request. - None, + None = 0, Asc, Desc, } @@ -370,6 +376,11 @@ impl ReadOrder { pub fn is_in_desc_order(&self) -> bool { matches!(self, ReadOrder::Desc) } + + #[inline] + pub fn into_i32(self) -> i32 { + self as i32 + } } #[derive(Debug)] @@ -387,6 +398,32 @@ pub struct ReadRequest { pub order: ReadOrder, } +impl TryFrom for proto::remote_engine::TableReadRequest { + type Error = Error; + + fn try_from(request: ReadRequest) -> std::result::Result { + let predicate_pb = request + .predicate + .as_ref() + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ReadRequestToPb { + msg: format!( + "convert predicate failed, predicate:{:?}", + request.predicate + ), + })?; + + Ok(Self { + request_id: request.request_id.as_u64(), + opts: Some(request.opts.into()), + projected_schema: Some(request.projected_schema.into()), + predicate: Some(predicate_pb), + order: request.order.into_i32(), + }) + } +} + impl TryFrom for ReadRequest { type Error = Error;