diff --git a/Cargo.lock b/Cargo.lock index 8ed42f1052..aa6026c0d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1099,6 +1099,7 @@ name = "common_util" version = "1.0.0-alpha01" dependencies = [ "arrow", + "avro-rs", "backtrace", "chrono", "common_types 1.0.0-alpha01", @@ -1115,6 +1116,7 @@ dependencies = [ "proto 1.0.0-alpha01", "serde", "serde_derive", + "serde_json", "slog", "slog-global", "snafu 0.6.10", @@ -5241,7 +5243,6 @@ dependencies = [ "analytic_engine", "arrow", "async-trait", - "avro-rs", "bytes 1.2.1", "catalog", "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=b9c45bcdbf7d55d5889d42b4c8017282819e6049)", @@ -5263,6 +5264,7 @@ dependencies = [ "prometheus 0.12.0", "prometheus-static-metric", "prost", + "proto 1.0.0-alpha01", "query_engine", "serde", "serde_derive", @@ -5841,6 +5843,7 @@ dependencies = [ "common_util", "datafusion", "datafusion-expr", + "datafusion-proto", "df_operator", "env_logger", "futures 0.3.21", diff --git a/common_types/src/projected_schema.rs b/common_types/src/projected_schema.rs index 16578cc39e..b76822fb42 100644 --- a/common_types/src/projected_schema.rs +++ b/common_types/src/projected_schema.rs @@ -4,7 +4,7 @@ use std::{fmt, sync::Arc}; -use snafu::{ensure, Backtrace, ResultExt, Snafu}; +use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ column_schema::{ColumnSchema, ReadOp}, @@ -36,6 +36,14 @@ pub enum Error { backtrace ))] MissingReadColumn { name: String, backtrace: Backtrace }, + + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] + EmptyTableSchema { backtrace: Backtrace }, + + #[snafu(display("Failed to covert table schema, err:{}", source))] + ConvertTableSchema { + source: Box, + }, } pub type Result = std::result::Result; @@ -147,6 +155,24 @@ impl ProjectedSchema { } } +impl TryFrom for ProjectedSchema { + type Error = Error; + + fn try_from(pb: proto::common::ProjectedSchema) -> std::result::Result { + let schema: Schema = pb + .table_schema + .context(EmptyTableSchema)? + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ConvertTableSchema)?; + let projection = pb + .projection + .map(|v| v.idx.into_iter().map(|id| id as usize).collect()); + + ProjectedSchema::new(schema, projection) + } +} + /// Schema with projection informations struct ProjectedSchemaInner { /// The schema before projection that the reader intended to read, may diff --git a/common_util/Cargo.toml b/common_util/Cargo.toml index 9a6496918b..296d71a935 100644 --- a/common_util/Cargo.toml +++ b/common_util/Cargo.toml @@ -16,6 +16,7 @@ test = ["env_logger"] [dependencies] # In alphabetical order arrow = { workspace = true } +avro-rs = "0.13" backtrace = "0.3.9" chrono = { workspace = true } common_types = { workspace = true, features = ["test"] } @@ -30,6 +31,7 @@ prometheus = { workspace = true } proto = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } +serde_json = { workspace = true } snafu = { workspace = true } time = "0.1" tokio = { workspace = true } diff --git a/server/src/avro_util.rs b/common_util/src/avro.rs similarity index 62% rename from server/src/avro_util.rs rename to common_util/src/avro.rs index 726ddd2b90..187e8f965c 100644 --- a/server/src/avro_util.rs +++ b/common_util/src/avro.rs @@ -7,17 +7,23 @@ use std::collections::HashMap; use avro_rs::{ schema::{Name, RecordField, RecordFieldOrder}, types::{Record, Value}, + Schema as AvroSchema, }; use common_types::{ - bytes::ByteVec, + bytes::{ByteVec, Bytes}, column::ColumnBlock, datum::{Datum, DatumKind}, record_batch::RecordBatch, - schema::RecordSchema, + row::{Row, RowGroup, RowGroupBuilder}, + schema::{RecordSchema, Schema}, + string::StringBytes, + time::Timestamp, }; -use common_util::define_result; use snafu::{Backtrace, ResultExt, Snafu}; +/// Schema name of the record +const RECORD_NAME: &str = "Result"; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( @@ -29,6 +35,21 @@ pub enum Error { source: avro_rs::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert to avro record, err:{}", source))] + ConvertToAvroRecord { + source: Box, + }, + + #[snafu(display( + "Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + InvalidAvroRecord { value: Value, backtrace: Backtrace }, + + #[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))] + UnsupportedType { value: Value, backtrace: Backtrace }, } define_result!(Error); @@ -85,6 +106,25 @@ pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema { } } +pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result> { + let mut rows = Vec::new(); + let avro_schema = to_avro_schema(RECORD_NAME, record_batch.schema()); + record_batch_to_avro(record_batch, &avro_schema, &mut rows)?; + Ok(rows) +} + +pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec]) -> Result { + let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema()); + let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len()); + for raw_row in rows { + let mut row = Vec::with_capacity(schema.num_columns()); + avro_row_to_row(&avro_schema, raw_row, &mut row)?; + builder.push_checked_row(Row::from_datums(row)); + } + + Ok(builder.build()) +} + fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema { match data_type { DatumKind::Null => avro_rs::Schema::Null, @@ -104,7 +144,7 @@ fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema { } /// Convert record batch to avro format -pub fn record_batch_to_avro( +fn record_batch_to_avro( record_batch: &RecordBatch, schema: &avro_rs::Schema, rows: &mut Vec, @@ -158,6 +198,41 @@ fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Va } } +/// Convert the avro `Value` into the `Datum`. +/// +/// Some types defined by avro are not used and the conversion rule is totally +/// based on the implementation in the server. +fn avro_value_to_datum(value: Value) -> Result { + let datum = match value { + Value::Null => Datum::Null, + Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)), + Value::Double(v) => Datum::Double(v), + Value::Float(v) => Datum::Float(v), + Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)), + Value::String(v) => Datum::String(StringBytes::from(v)), + // FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is + // not supported by avro, that is to say something may go wrong in some corner case. + Value::Long(v) => Datum::Int64(v), + Value::Int(v) => Datum::Int32(v), + Value::Boolean(v) => Datum::Boolean(v), + Value::Union(inner_val) => avro_value_to_datum(*inner_val)?, + Value::Fixed(_, _) + | Value::Enum(_, _) + | Value::Array(_) + | Value::Map(_) + | Value::Record(_) + | Value::Date(_) + | Value::Decimal(_) + | Value::TimeMillis(_) + | Value::TimeMicros(_) + | Value::TimestampMicros(_) + | Value::Duration(_) + | Value::Uuid(_) => return UnsupportedType { value }.fail(), + }; + + Ok(datum) +} + #[inline] fn may_union(val: Value, is_nullable: bool) -> Value { if is_nullable { @@ -166,3 +241,19 @@ fn may_union(val: Value, is_nullable: bool) -> Value { val } } + +fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec) -> Result<()> { + let record = avro_rs::from_avro_datum(schema, &mut raw, None) + .map_err(|e| Box::new(e) as _) + .context(ConvertToAvroRecord)?; + if let Value::Record(cols) = record { + for (_, column_value) in cols { + let datum = avro_value_to_datum(column_value)?; + row.push(datum); + } + + Ok(()) + } else { + InvalidAvroRecord { value: record }.fail() + } +} diff --git a/common_util/src/lib.rs b/common_util/src/lib.rs index 00c9e31139..88cec3674d 100644 --- a/common_util/src/lib.rs +++ b/common_util/src/lib.rs @@ -9,6 +9,7 @@ pub mod macros; // TODO(yingwen): Move some mod into components as a crate pub mod alloc_tracker; +pub mod avro; pub mod codec; pub mod config; pub mod error; diff --git a/proto/protos/common.proto b/proto/protos/common.proto index d46f433d5d..f7871be4e0 100644 --- a/proto/protos/common.proto +++ b/proto/protos/common.proto @@ -61,3 +61,13 @@ message TimeRange { // exclusive end int64 end = 2; } + +// Projected Schema +message ProjectedSchema { + common.TableSchema table_schema = 1; + Projection projection = 2; +} + +message Projection { + repeated uint64 idx = 1; +} diff --git a/proto/protos/remote_engine.proto b/proto/protos/remote_engine.proto index b175e9d2cc..764e80068d 100644 --- a/proto/protos/remote_engine.proto +++ b/proto/protos/remote_engine.proto @@ -26,15 +26,6 @@ message ReadOptions { uint64 read_parallelism = 2; } -message Projection { - repeated uint64 idx = 1; -} - -message ProjectedSchema { - common.TableSchema table_schema = 1; - Projection projection = 2; -} - message Predicate { repeated bytes exprs = 1; common.TimeRange time_range = 2; @@ -49,7 +40,7 @@ enum ReadOrder { message TableReadRequest { uint64 request_id = 1; ReadOptions opts = 2; - ProjectedSchema projected_schema = 3; + common.ProjectedSchema projected_schema = 3; Predicate predicate = 4; ReadOrder order = 5; } @@ -61,14 +52,18 @@ message ReadRequest { message ReadResponse { ResponseHeader header = 1; - repeated bytes rows = 2; + // Version of row encoding method + uint32 version = 2; + repeated bytes rows = 3; } message RowGroup { + // Version of row encoding method common.TableSchema table_schema = 1; - repeated bytes rows = 2; - int64 min_timestamp = 3; - int64 max_timestamp = 4; + int64 min_timestamp = 2; + int64 max_timestamp = 3; + uint32 version = 4; + repeated bytes rows = 5; } message WriteRequest { diff --git a/server/Cargo.toml b/server/Cargo.toml index eb05913bf8..b624c2c07f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,7 +14,6 @@ workspace = true analytic_engine = { workspace = true } arrow = { workspace = true } async-trait = { workspace = true } -avro-rs = "0.13" bytes = { workspace = true } catalog = { workspace = true } ceresdbproto = { workspace = true } @@ -36,6 +35,7 @@ profile = { workspace = true } prometheus = { workspace = true } prometheus-static-metric = { workspace = true } prost = { workspace = true } +proto = { workspace = true } query_engine = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 2cf342263e..b061d53c9d 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -25,6 +25,7 @@ use common_util::{ }; use futures::FutureExt; use log::{info, warn}; +use proto::remote_engine::remote_engine_service_server::RemoteEngineServiceServer; use query_engine::executor::Executor as QueryExecutor; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::EngineRuntimes; @@ -35,7 +36,7 @@ use crate::{ config::Endpoint, grpc::{ forward::Forwarder, meta_event_service::MetaServiceImpl, - storage_service::StorageServiceImpl, + remote_engine_service::RemoteEngineServiceImpl, storage_service::StorageServiceImpl, }, instance::InstanceRef, route::RouterRef, @@ -45,6 +46,7 @@ use crate::{ pub mod forward; mod meta_event_service; mod metrics; +mod remote_engine_service; mod storage_service; #[derive(Debug, Snafu)] @@ -146,6 +148,7 @@ pub struct RpcServices { serve_addr: SocketAddr, rpc_server: StorageServiceServer>, meta_rpc_server: Option>>, + remote_engine_server: RemoteEngineServiceServer>, runtime: Arc, stop_tx: Option>, join_handle: Option>, @@ -155,6 +158,7 @@ impl RpcServices { pub async fn start(&mut self) -> Result<()> { let rpc_server = self.rpc_server.clone(); let meta_rpc_server = self.meta_rpc_server.clone(); + let remote_engine_server = self.remote_engine_server.clone(); let serve_addr = self.serve_addr; let (stop_tx, stop_rx) = oneshot::channel(); let join_handle = self.runtime.spawn(async move { @@ -167,6 +171,9 @@ impl RpcServices { router = router.add_service(s); }; + info!("Grpc server serves remote engine rpc service"); + router = router.add_service(remote_engine_server); + let serve_res = router .serve_with_shutdown(serve_addr, stop_rx.map(drop)) .await; @@ -277,6 +284,14 @@ impl Builder { MetaEventServiceServer::new(meta_service) }); + let remote_engine_server = { + let service = RemoteEngineServiceImpl { + instance: instance.clone(), + runtimes: runtimes.clone(), + }; + RemoteEngineServiceServer::new(service) + }; + let forward_config = self.forward_config.unwrap_or_default(); let forwarder = if forward_config.enable { let local_endpoint = @@ -306,6 +321,7 @@ impl Builder { serve_addr, rpc_server, meta_rpc_server, + remote_engine_server, runtime: bg_runtime, stop_tx: None, join_handle: None, diff --git a/server/src/grpc/remote_engine_service/error.rs b/server/src/grpc/remote_engine_service/error.rs new file mode 100644 index 0000000000..5f52dc2de6 --- /dev/null +++ b/server/src/grpc/remote_engine_service/error.rs @@ -0,0 +1,81 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Error definitions for meta event service. + +use common_util::define_result; +use proto::remote_engine::ResponseHeader; +use snafu::Snafu; + +use crate::error_util; + +define_result!(Error); + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Server error, code:{:?}, message:{}", code, msg))] + ErrNoCause { code: StatusCode, msg: String }, + + #[snafu(display("Server error, code:{:?}, message:{}, cause:{}", code, msg, source))] + ErrWithCause { + code: StatusCode, + msg: String, + source: Box, + }, +} + +impl Error { + pub fn code(&self) -> StatusCode { + match *self { + Error::ErrNoCause { code, .. } => code, + Error::ErrWithCause { code, .. } => code, + } + } + + /// Get the error message returned to the user. + pub fn error_message(&self) -> String { + match self { + Error::ErrNoCause { msg, .. } => msg.clone(), + + Error::ErrWithCause { msg, source, .. } => { + let err_string = source.to_string(); + let first_line = error_util::first_line_in_error(&err_string); + format!("{}. Caused by: {}", msg, first_line) + } + } + } +} + +/// A set of codes for meta event service. +/// +/// Note that such a set of codes is different with the codes (alias to http +/// status code) used by storage service. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum StatusCode { + #[default] + Ok = 0, + BadRequest = 401, + NotFound = 404, + Internal = 500, +} + +impl StatusCode { + #[inline] + pub fn as_u32(self) -> u32 { + self as u32 + } +} + +pub fn build_err_header(err: Error) -> ResponseHeader { + ResponseHeader { + code: err.code().as_u32(), + error: err.error_message(), + } +} + +pub fn build_ok_header() -> ResponseHeader { + ResponseHeader { + code: StatusCode::Ok.as_u32(), + ..Default::default() + } +} diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs new file mode 100644 index 0000000000..64d55c61da --- /dev/null +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -0,0 +1,284 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +// Remote engine rpc service implementation. + +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::manager::ManagerRef; +use common_types::record_batch::RecordBatch; +use common_util::avro; +use futures::stream::{self, BoxStream, StreamExt}; +use log::error; +use proto::remote_engine::{ + remote_engine_service_server::RemoteEngineService, ReadRequest, ReadResponse, WriteRequest, + WriteResponse, +}; +use query_engine::executor::Executor as QueryExecutor; +use snafu::{OptionExt, ResultExt}; +use table_engine::{ + engine::EngineRuntimes, remote::model::TableIdentifier, stream::PartitionedStreams, + table::TableRef, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +use crate::{ + grpc::remote_engine_service::error::{ + build_ok_header, ErrNoCause, ErrWithCause, Result, StatusCode, + }, + instance::InstanceRef, +}; + +pub(crate) mod error; + +const STREAM_QUERY_CHANNEL_LEN: usize = 20; +const ENCODE_ROWS_WITH_AVRO: u32 = 0; + +#[derive(Clone)] +pub struct RemoteEngineServiceImpl { + pub instance: InstanceRef, + pub runtimes: Arc, +} + +impl RemoteEngineServiceImpl { + async fn stream_read_internal( + &self, + request: Request, + ) -> Result>> { + let ctx = self.handler_ctx(); + let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); + let handle = self.runtimes.read_runtime.spawn(async move { + let read_request = request.into_inner(); + handle_stream_read(ctx, read_request).await + }); + let streams = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + })??; + + for stream in streams.streams { + let mut stream = stream.map(|result| { + result.map_err(|e| Box::new(e) as _).context(ErrWithCause { + code: StatusCode::Internal, + msg: "record batch failed", + }) + }); + let tx = tx.clone(); + let _ = self.runtimes.read_runtime.spawn(async move { + while let Some(batch) = stream.next().await { + if let Err(e) = tx.send(batch).await { + error!("Failed to send handler result, err:{}.", e); + break; + } + } + }); + } + Ok(ReceiverStream::new(rx)) + } + + async fn write_internal( + &self, + request: Request, + ) -> std::result::Result, Status> { + let ctx = self.handler_ctx(); + let handle = self.runtimes.write_runtime.spawn(async move { + let request = request.into_inner(); + handle_write(ctx, request).await + }); + + let res = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + }); + + let mut resp = WriteResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(error::build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(error::build_err_header(e)); + } + }; + + Ok(tonic::Response::new(resp)) + } + + fn handler_ctx(&self) -> HandlerContext { + HandlerContext { + catalog_manager: self.instance.catalog_manager.clone(), + } + } +} + +/// Context for handling all kinds of remote engine service. +struct HandlerContext { + catalog_manager: ManagerRef, +} + +#[async_trait] +impl RemoteEngineService for RemoteEngineServiceImpl { + type ReadStream = BoxStream<'static, std::result::Result>; + + async fn read( + &self, + request: Request, + ) -> std::result::Result, Status> { + match self.stream_read_internal(request).await { + Ok(stream) => { + let new_stream: Self::ReadStream = Box::pin(stream.map(|res| match res { + Ok(record_batch) => { + let resp = match avro::record_batch_to_avro_rows(&record_batch) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to convert record batch to avro", + }) { + Err(e) => ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }, + Ok(rows) => ReadResponse { + header: Some(build_ok_header()), + version: ENCODE_ROWS_WITH_AVRO, + rows, + }, + }; + + Ok(resp) + } + Err(e) => { + let resp = ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }; + Ok(resp) + } + })); + + Ok(tonic::Response::new(new_stream)) + } + Err(e) => { + let resp = ReadResponse { + header: Some(error::build_err_header(e)), + ..Default::default() + }; + let stream = stream::once(async { Ok(resp) }); + Ok(tonic::Response::new(Box::pin(stream))) + } + } + } + + async fn write( + &self, + request: Request, + ) -> std::result::Result, Status> { + self.write_internal(request).await + } +} + +async fn handle_stream_read( + ctx: HandlerContext, + request: ReadRequest, +) -> Result { + let read_request: table_engine::remote::model::ReadRequest = request + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert read request", + })?; + + let table = find_table_by_identifier(&ctx, &read_request.table)?; + + let streams = table + .partitioned_read(read_request.read_request) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to read table, table:{:?}", read_request.table), + })?; + + Ok(streams) +} + +async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result { + let write_request: table_engine::remote::model::WriteRequest = request + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert write request", + })?; + + let table = find_table_by_identifier(&ctx, &write_request.table)?; + + let affected_rows = table + .write(write_request.write_request) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to write table, table:{:?}", write_request.table), + })?; + Ok(WriteResponse { + header: None, + affected_rows: affected_rows as u64, + }) +} + +fn find_table_by_identifier( + ctx: &HandlerContext, + table_identifier: &TableIdentifier, +) -> Result { + let catalog = ctx + .catalog_manager + .catalog_by_name(&table_identifier.catalog) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to get catalog, catalog:{}", table_identifier.catalog), + })? + .context(ErrNoCause { + code: StatusCode::NotFound, + msg: format!("catalog is not found, catalog:{}", table_identifier.catalog), + })?; + let schema = catalog + .schema_by_name(&table_identifier.schema) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!( + "fail to get schema of table, schema:{}", + table_identifier.schema + ), + })? + .context(ErrNoCause { + code: StatusCode::NotFound, + msg: format!( + "schema of table is not found, schema:{}", + table_identifier.schema + ), + })?; + + schema + .table_by_name(&table_identifier.table) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to get table, table:{}", table_identifier.table), + })? + .context(ErrNoCause { + code: StatusCode::NotFound, + msg: format!("table is not found, table:{}", table_identifier.table), + }) +} diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 7ce10266b0..298dd9baff 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -363,7 +363,7 @@ impl StorageServiceImpl { error!("Failed to handle request, mod:stream_query, handler:handle_stream_query, err:{}", e); e })?; - if let Some(batch) = query::get_record_batch(&output) { + if let Some(batch) = query::get_record_batch(output) { for i in 0..batch.len() { let resp = query::convert_records(&batch[i..i + 1]); if tx.send(resp).await.is_err() { diff --git a/server/src/grpc/storage_service/query.rs b/server/src/grpc/storage_service/query.rs index aa165483cc..ade4bd181c 100644 --- a/server/src/grpc/storage_service/query.rs +++ b/server/src/grpc/storage_service/query.rs @@ -11,7 +11,7 @@ use ceresdbproto::{ }, }; use common_types::{record_batch::RecordBatch, request_id::RequestId}; -use common_util::time::InstantExt; +use common_util::{avro, time::InstantExt}; use futures::FutureExt; use http::StatusCode; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; @@ -25,7 +25,6 @@ use sql::{ use tonic::{transport::Channel, IntoRequest}; use crate::{ - avro_util, config::Endpoint, grpc::{ forward::{ForwardRequest, ForwardResult}, @@ -247,7 +246,7 @@ fn convert_output(output: &Output) -> Result { } } -pub fn get_record_batch(op: &Option) -> Option<&RecordBatchVec> { +pub fn get_record_batch(op: Option) -> Option { if let Some(output) = op { match output { Output::Records(records) => Some(records), @@ -270,27 +269,23 @@ pub fn convert_records(records: &[RecordBatch]) -> Result { let total_row = records.iter().map(|v| v.num_rows()).sum(); resp.rows = Vec::with_capacity(total_row); for record_batch in records { - let avro_schema = match avro_schema_opt.as_ref() { - Some(schema) => schema, - None => { - let avro_schema = avro_util::to_avro_schema(RECORD_NAME, record_batch.schema()); + if avro_schema_opt.as_ref().is_none() { + let avro_schema = avro::to_avro_schema(RECORD_NAME, record_batch.schema()); - // We only set schema_json once, so all record batches need to have same schema - resp.schema_type = query_response::SchemaType::Avro as i32; - resp.schema_content = avro_schema.canonical_form(); + // We only set schema_json once, so all record batches need to have same schema + resp.schema_type = query_response::SchemaType::Avro as i32; + resp.schema_content = avro_schema.canonical_form(); - avro_schema_opt = Some(avro_schema); - - avro_schema_opt.as_ref().unwrap() - } - }; + avro_schema_opt = Some(avro_schema); + } - avro_util::record_batch_to_avro(record_batch, avro_schema, &mut resp.rows) + let mut rows = avro::record_batch_to_avro_rows(record_batch) .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, msg: "failed to convert record batch", })?; + resp.rows.append(&mut rows); } Ok(resp) diff --git a/server/src/lib.rs b/server/src/lib.rs index dc5ef663dc..1be4335f41 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,7 +8,6 @@ #[macro_use] extern crate common_util; -mod avro_util; pub mod config; mod consts; mod context; diff --git a/table_engine/Cargo.toml b/table_engine/Cargo.toml index d100268057..6b46d5427f 100644 --- a/table_engine/Cargo.toml +++ b/table_engine/Cargo.toml @@ -18,6 +18,7 @@ common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } datafusion-expr = { workspace = true } +datafusion-proto = { workspace = true } df_operator = { workspace = true } futures = { workspace = true } itertools = "0.10.5" diff --git a/table_engine/src/predicate.rs b/table_engine/src/predicate.rs index db480930eb..20955ab6c1 100644 --- a/table_engine/src/predicate.rs +++ b/table_engine/src/predicate.rs @@ -13,8 +13,9 @@ use datafusion::{ logical_plan::{Expr, Operator}, scalar::ScalarValue, }; +use datafusion_proto::bytes::Serializeable; use log::debug; -use snafu::Snafu; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] @@ -23,6 +24,17 @@ pub enum Error { Prune { source: datafusion::error::DataFusionError, }, + + #[snafu(display("Empty time range.\nBacktrace:\n{}", backtrace))] + EmptyTimeRange { backtrace: Backtrace }, + + #[snafu(display("Invalid time range.\nBacktrace:\n{}", backtrace))] + InvalidTimeRange { backtrace: Backtrace }, + + #[snafu(display("Expr decode failed., err:{}", source))] + DecodeExpr { + source: Box, + }, } define_result!(Error); @@ -69,6 +81,29 @@ impl Predicate { } } +impl TryFrom for Predicate { + type Error = Error; + + fn try_from(pb: proto::remote_engine::Predicate) -> std::result::Result { + let time_range = pb.time_range.context(EmptyTimeRange)?; + let mut exprs = Vec::with_capacity(pb.exprs.len()); + for pb_expr in pb.exprs { + let expr = Expr::from_bytes(&pb_expr) + .map_err(|e| Box::new(e) as _) + .context(DecodeExpr)?; + exprs.push(expr); + } + Ok(Self { + exprs, + time_range: TimeRange::new( + Timestamp::new(time_range.start), + Timestamp::new(time_range.end), + ) + .context(InvalidTimeRange)?, + }) + } +} + /// Builder for [Predicate] #[derive(Debug, Clone, Default)] #[must_use] diff --git a/table_engine/src/remote/mod.rs b/table_engine/src/remote/mod.rs index 607d3a8ec9..502baaccb8 100644 --- a/table_engine/src/remote/mod.rs +++ b/table_engine/src/remote/mod.rs @@ -7,12 +7,46 @@ pub mod model; use async_trait::async_trait; use common_util::define_result; use model::{ReadRequest, WriteRequest}; -use snafu::Snafu; +use snafu::{Backtrace, Snafu}; use crate::stream::SendableRecordBatchStream; #[derive(Debug, Snafu)] -pub enum Error {} +pub enum Error { + #[snafu(display("Empty table identifier.\nBacktrace:\n{}", backtrace))] + EmptyTableIdentifier { backtrace: Backtrace }, + + #[snafu(display("Empty table read request.\nBacktrace:\n{}", backtrace))] + EmptyTableReadRequest { backtrace: Backtrace }, + + #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] + EmptyTableSchema { backtrace: Backtrace }, + + #[snafu(display("Empty row group.\nBacktrace:\n{}", backtrace))] + EmptyRowGroup { backtrace: Backtrace }, + + #[snafu(display("Failed to covert table read request, err:{}", source))] + ConvertTableReadRequest { + source: Box, + }, + + #[snafu(display("Failed to covert table schema, err:{}", source))] + ConvertTableSchema { + source: Box, + }, + + #[snafu(display("Failed to covert row group, err:{}", source))] + ConvertRowGroup { + source: Box, + }, + + #[snafu(display( + "Failed to covert row group, encoding version:{}.\nBacktrace:\n{}", + version, + backtrace + ))] + UnsupportedConvertRowGroup { version: u32, backtrace: Backtrace }, +} define_result!(Error); diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 9913a20e06..74e130d562 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -2,23 +2,90 @@ //! Model for remote table engine -use crate::table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest}; +use common_types::schema::Schema; +use common_util::avro; +use snafu::{OptionExt, ResultExt}; -#[allow(dead_code)] +use crate::{ + remote::{ + ConvertRowGroup, ConvertTableReadRequest, ConvertTableSchema, EmptyRowGroup, + EmptyTableIdentifier, EmptyTableReadRequest, EmptyTableSchema, Error, + UnsupportedConvertRowGroup, + }, + table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest}, +}; + +const ENCODE_ROWS_WITH_AVRO: u32 = 0; + +#[derive(Debug)] pub struct TableIdentifier { pub catalog: String, pub schema: String, pub table: String, } -#[allow(dead_code)] +impl From for TableIdentifier { + fn from(pb: proto::remote_engine::TableIdentifier) -> Self { + Self { + catalog: pb.catalog, + schema: pb.schema, + table: pb.table, + } + } +} + pub struct ReadRequest { pub table: TableIdentifier, - pub table_request: TableReadRequest, + pub read_request: TableReadRequest, +} + +impl TryFrom for ReadRequest { + type Error = Error; + + fn try_from(pb: proto::remote_engine::ReadRequest) -> Result { + let table_identifier = pb.table.context(EmptyTableIdentifier)?; + let table_read_request = pb.read_request.context(EmptyTableReadRequest)?; + Ok(Self { + table: table_identifier.into(), + read_request: table_read_request + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ConvertTableReadRequest)?, + }) + } } #[allow(dead_code)] pub struct WriteRequest { pub table: TableIdentifier, - pub table_request: TableWriteRequest, + pub write_request: TableWriteRequest, +} + +impl TryFrom for WriteRequest { + type Error = Error; + + fn try_from(pb: proto::remote_engine::WriteRequest) -> Result { + let table_identifier = pb.table.context(EmptyTableIdentifier)?; + let row_group_pb = pb.row_group.context(EmptyRowGroup)?; + let table_schema: Schema = row_group_pb + .table_schema + .context(EmptyTableSchema)? + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ConvertTableSchema)?; + let row_group = if row_group_pb.version == ENCODE_ROWS_WITH_AVRO { + avro::avro_rows_to_row_group(table_schema, &row_group_pb.rows) + .map_err(|e| Box::new(e) as _) + .context(ConvertRowGroup)? + } else { + UnsupportedConvertRowGroup { + version: row_group_pb.version, + } + .fail()? + }; + Ok(Self { + table: table_identifier.into(), + write_request: TableWriteRequest { row_group }, + }) + } } diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 35756f3795..005352ba3c 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -22,7 +22,7 @@ use common_types::{ }; use proto::sys_catalog as sys_catalog_pb; use serde_derive::Deserialize; -use snafu::{Backtrace, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ engine::TableState, @@ -124,6 +124,25 @@ pub enum Error { table: String, source: Box, }, + + #[snafu(display("Empty read options.\nBacktrace:\n{}", backtrace))] + EmptyReadOptions { backtrace: Backtrace }, + + #[snafu(display("Empty projected schema.\nBacktrace:\n{}", backtrace))] + EmptyProjectedSchema { backtrace: Backtrace }, + + #[snafu(display("Empty predicate.\nBacktrace:\n{}", backtrace))] + EmptyPredicate { backtrace: Backtrace }, + + #[snafu(display("Failed to covert projected schema, err:{}", source))] + ConvertProjectedSchema { + source: Box, + }, + + #[snafu(display("Failed to covert predicate, err:{}", source))] + ConvertPredicate { + source: Box, + }, } define_result!(Error); @@ -291,6 +310,24 @@ impl Default for ReadOptions { } } +impl From for ReadOptions { + fn from(pb: proto::remote_engine::ReadOptions) -> Self { + Self { + batch_size: pb.batch_size as usize, + read_parallelism: pb.read_parallelism as usize, + } + } +} + +impl From for proto::remote_engine::ReadOptions { + fn from(opts: ReadOptions) -> Self { + Self { + batch_size: opts.batch_size as u64, + read_parallelism: opts.read_parallelism as u64, + } + } +} + #[derive(Debug)] pub struct GetRequest { /// Query request id. @@ -350,6 +387,41 @@ pub struct ReadRequest { pub order: ReadOrder, } +impl TryFrom for ReadRequest { + type Error = Error; + + fn try_from(pb: proto::remote_engine::TableReadRequest) -> Result { + let opts = pb.opts.context(EmptyReadOptions)?.into(); + let projected_schema = pb + .projected_schema + .context(EmptyProjectedSchema)? + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ConvertProjectedSchema)?; + let predicate = Arc::new( + pb.predicate + .context(EmptyPredicate)? + .try_into() + .map_err(|e| Box::new(e) as _) + .context(ConvertPredicate)?, + ); + let order = if pb.order == proto::remote_engine::ReadOrder::Asc as i32 { + ReadOrder::Asc + } else if pb.order == proto::remote_engine::ReadOrder::Desc as i32 { + ReadOrder::Desc + } else { + ReadOrder::None + }; + Ok(Self { + request_id: RequestId::next_id(), + opts, + projected_schema, + predicate, + order, + }) + } +} + #[derive(Debug)] pub struct AlterSchemaRequest { /// The new schema.