diff --git a/Cargo.lock b/Cargo.lock index 42e9c168f7..421f342d8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1302,9 +1302,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25926e49d9d931b3089b26aba55cd5057631db452137f45d0d24f8b5dae8a28c" +checksum = "ea8a61d72d30452b689e761344d9502bcc5feb2dbd06f08b507b2e164b549aee" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index f357cd1ee4..ee4b86ca90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ bytes = "1.1.0" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0" +ceresdbproto = "1.0.10" codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 81f53373c2..9ecac77d04 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -346,7 +346,7 @@ impl TryFrom<&Arc> for ColumnSchema { impl From<&ColumnSchema> for Field { fn from(col_schema: &ColumnSchema) -> Self { let metadata = encode_arrow_field_meta_data(col_schema); - // If the column sholud use dictionary, create correspond dictionary type. + // If the column should use dictionary, create correspond dictionary type. let mut field = if col_schema.is_dictionary { Field::new_dict( &col_schema.name, diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 2c89efcc7b..04fa2b683b 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -1177,6 +1177,28 @@ impl<'a> DatumView<'a> { _ => None, } } + + pub fn to_datum(&self) -> Datum { + match self { + DatumView::Null => Datum::Null, + DatumView::Timestamp(v) => Datum::Timestamp(*v), + DatumView::Double(v) => Datum::Double(*v), + DatumView::Float(v) => Datum::Float(*v), + DatumView::Varbinary(v) => Datum::Varbinary(Bytes::from(v.to_vec())), + DatumView::String(v) => Datum::String(StringBytes::copy_from_str(v)), + DatumView::UInt64(v) => Datum::UInt64(*v), + DatumView::UInt32(v) => Datum::UInt32(*v), + DatumView::UInt16(v) => Datum::UInt16(*v), + DatumView::UInt8(v) => Datum::UInt8(*v), + DatumView::Int64(v) => Datum::Int64(*v), + DatumView::Int32(v) => Datum::Int32(*v), + DatumView::Int16(v) => Datum::Int16(*v), + DatumView::Int8(v) => Datum::Int8(*v), + DatumView::Boolean(v) => Datum::Boolean(*v), + DatumView::Date(v) => Datum::Date(*v), + DatumView::Time(v) => Datum::Time(*v), + } + } } impl<'a> std::hash::Hash for DatumView<'a> { diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index 0974983c20..1ae1fc9039 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -121,11 +121,9 @@ impl Client { // Write to remote. let table_ident = request.table.clone(); - let request_pb = WriteRequest::convert_to_pb(request, self.compression) - .box_err() - .context(Convert { - msg: "Failed to convert WriteRequest to pb", - })?; + let request_pb = request.convert_into_pb().box_err().context(Convert { + msg: "Failed to convert WriteRequest to pb", + })?; let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); let result = rpc_client @@ -184,16 +182,10 @@ impl Client { request, channel, } = context; - let compress_options = self.compression; + let batch_request_pb = request.convert_into_pb().box_err().context(Convert { + msg: "failed to convert request to pb", + })?; let handle = self.io_runtime.spawn(async move { - let batch_request_pb = - match WriteBatchRequest::convert_write_batch_to_pb(request, compress_options) - .box_err() - { - Ok(pb) => pb, - Err(e) => return Err(e), - }; - let mut rpc_client = RemoteEngineServiceClient::::new(channel); let rpc_result = rpc_client .write_batch(Request::new(batch_request_pb)) diff --git a/server/src/grpc/metrics.rs b/server/src/grpc/metrics.rs index 04c982bb8c..9085b58660 100644 --- a/server/src/grpc/metrics.rs +++ b/server/src/grpc/metrics.rs @@ -16,8 +16,8 @@ use lazy_static::lazy_static; use prometheus::{ - exponential_buckets, register_histogram_vec, register_int_counter_vec, HistogramVec, - IntCounterVec, + exponential_buckets, register_histogram, register_histogram_vec, register_int_counter_vec, + Histogram, HistogramVec, IntCounterVec, }; use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric}; @@ -102,6 +102,12 @@ lazy_static! { &["type"] ) .unwrap(); + pub static ref REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM: Histogram = register_histogram!( + "remote_engine_write_batch_num_rows", + "Bucketed histogram of grpc server handler", + vec![1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 2000.0] + ) + .unwrap(); pub static ref META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL: HistogramVec = register_histogram_vec!( "meta_event_grpc_handler_duration", diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 69a4861d6a..a2cdb1a004 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use catalog::{manager::ManagerRef, schema::SchemaRef}; use ceresdbproto::{ remote_engine::{ - read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService, + read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService, row_group, GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteBatchRequest, WriteRequest, WriteResponse, }, @@ -38,14 +38,18 @@ use proxy::{ use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::{OptionExt, ResultExt}; use table_engine::{ - engine::EngineRuntimes, predicate::PredicateRef, remote::model::TableIdentifier, - stream::PartitionedStreams, table::TableRef, + engine::EngineRuntimes, + predicate::PredicateRef, + remote::model::{self, TableIdentifier}, + stream::PartitionedStreams, + table::TableRef, }; use time_ext::InstantExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; +use super::metrics::REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM; use crate::{ dedup_requests::{RequestNotifiers, RequestResult}, grpc::{ @@ -604,19 +608,56 @@ async fn record_write( } async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result { - let write_request: table_engine::remote::model::WriteRequest = - request.try_into().box_err().context(ErrWithCause { + let table_ident: TableIdentifier = request + .table + .context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "missing table ident", + })? + .into(); + + let rows_payload = request + .row_group + .context(ErrNoCause { + code: StatusCode::BadRequest, + msg: "missing row group payload", + })? + .rows + .context(ErrNoCause { code: StatusCode::BadRequest, - msg: "fail to convert write request", + msg: "missing rows payload", })?; + let table = find_table_by_identifier(&ctx, &table_ident)?; + let write_request = match rows_payload { + row_group::Rows::Arrow(_) => { + // The payload encoded in arrow format won't be accept any more. + return ErrNoCause { + code: StatusCode::BadRequest, + msg: "payload encoded in arrow format is not supported anymore", + } + .fail(); + } + row_group::Rows::Contiguous(payload) => { + let schema = table.schema(); + let row_group = + model::WriteRequest::decode_row_group_from_contiguous_payload(payload, &schema) + .box_err() + .context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "failed to decode row group payload", + })?; + model::WriteRequest::new(table_ident, row_group) + } + }; + // In theory we should record write request we at the beginning of server's // handle, but the payload is encoded, so we cannot record until decode payload // here. record_write(&ctx.hotspot_recorder, &write_request).await; let num_rows = write_request.write_request.row_group.num_rows(); - let table = find_table_by_identifier(&ctx, &write_request.table)?; + REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM.observe(num_rows as f64); let res = table .write(write_request.write_request) diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 3dd7091331..3aea5789bb 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -16,23 +16,18 @@ use std::collections::HashMap; -use arrow_ext::{ - ipc, - ipc::{CompressOptions, CompressionMethod}, -}; -use ceresdbproto::{ - remote_engine, - remote_engine::row_group::Rows::Arrow, - storage::{arrow_payload, ArrowPayload}, -}; +use bytes_ext::ByteVec; +use ceresdbproto::remote_engine::{self, row_group::Rows::Contiguous}; use common_types::{ - record_batch::{RecordBatch, RecordBatchWithKeyBuilder}, - row::{RowGroup, RowGroupBuilder}, - schema::Schema, + row::{ + contiguous::{ContiguousRow, ContiguousRowReader, ContiguousRowWriter}, + Row, RowGroup, RowGroupBuilder, + }, + schema::{IndexInWriterSchema, Schema}, }; use generic_error::{BoxError, GenericError, GenericResult}; use macros::define_result; -use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ partition::PartitionInfo, @@ -47,15 +42,11 @@ pub enum Error { ReadRequestToPb { source: crate::table::Error }, #[snafu(display( - "Failed to convert write request to pb, table_ident:{:?}, msg:{}.\nBacktrace:\n{}", - table_ident, - msg, - backtrace + "Failed to convert write request to pb, table_ident:{table_ident:?}, err:{source}", ))] - WriteRequestToPbNoCause { + WriteRequestToPb { table_ident: TableIdentifier, - msg: String, - backtrace: Backtrace, + source: common_types::row::contiguous::Error, }, #[snafu(display("Empty table identifier.\nBacktrace:\n{}", backtrace))] @@ -120,9 +111,7 @@ pub struct ReadRequest { impl TryFrom for ReadRequest { type Error = Error; - fn try_from( - pb: ceresdbproto::remote_engine::ReadRequest, - ) -> std::result::Result { + fn try_from(pb: ceresdbproto::remote_engine::ReadRequest) -> Result { let table_identifier = pb.table.context(EmptyTableIdentifier)?; let table_read_request = pb.read_request.context(EmptyTableReadRequest)?; Ok(Self { @@ -138,7 +127,7 @@ impl TryFrom for ReadRequest { impl TryFrom for ceresdbproto::remote_engine::ReadRequest { type Error = Error; - fn try_from(request: ReadRequest) -> std::result::Result { + fn try_from(request: ReadRequest) -> Result { let table_pb = request.table.into(); let request_pb = request.read_request.try_into().context(ReadRequestToPb)?; @@ -154,32 +143,13 @@ pub struct WriteBatchRequest { pub batch: Vec, } -impl TryFrom for WriteBatchRequest { - type Error = Error; - - fn try_from( - pb: ceresdbproto::remote_engine::WriteBatchRequest, - ) -> std::result::Result { - let batch = pb - .batch - .into_iter() - .map(WriteRequest::try_from) - .collect::, Self::Error>>()?; - - Ok(WriteBatchRequest { batch }) - } -} - impl WriteBatchRequest { - pub fn convert_write_batch_to_pb( - batch_request: WriteBatchRequest, - compress_options: CompressOptions, - ) -> std::result::Result { - let batch = batch_request + pub fn convert_into_pb(self) -> Result { + let batch = self .batch .into_iter() - .map(|req| WriteRequest::convert_to_pb(req, compress_options)) - .collect::, Error>>()?; + .map(|req| req.convert_into_pb()) + .collect::>>()?; Ok(remote_engine::WriteBatchRequest { batch }) } @@ -190,89 +160,68 @@ pub struct WriteRequest { pub write_request: TableWriteRequest, } -impl TryFrom for WriteRequest { - type Error = Error; +impl WriteRequest { + pub fn new(table_ident: TableIdentifier, row_group: RowGroup) -> Self { + Self { + table: table_ident, + write_request: TableWriteRequest { row_group }, + } + } - fn try_from( - pb: ceresdbproto::remote_engine::WriteRequest, - ) -> std::result::Result { - let table_identifier = pb.table.context(EmptyTableIdentifier)?; - let row_group_pb = pb.row_group.context(EmptyRowGroup)?; - let rows = row_group_pb.rows.context(EmptyRowGroup)?; - let row_group = match rows { - Arrow(v) => { - ensure!(!v.record_batches.is_empty(), EmptyRecordBatch); - - let compression = match v.compression() { - arrow_payload::Compression::None => CompressionMethod::None, - arrow_payload::Compression::Zstd => CompressionMethod::Zstd, - }; - - let mut record_batch_vec = vec![]; - for data in v.record_batches { - let mut arrow_record_batch_vec = ipc::decode_record_batches(data, compression) - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - record_batch_vec.append(&mut arrow_record_batch_vec); - } - - build_row_group_from_record_batch(record_batch_vec)? - } - }; + pub fn decode_row_group_from_contiguous_payload( + payload: ceresdbproto::remote_engine::ContiguousRows, + schema: &Schema, + ) -> Result { + let mut row_group_builder = + RowGroupBuilder::with_capacity(schema.clone(), payload.encoded_rows.len()); + for encoded_row in payload.encoded_rows { + let reader = ContiguousRowReader::try_new(&encoded_row, schema) + .box_err() + .context(ConvertRowGroup)?; - Ok(Self { - table: table_identifier.into(), - write_request: TableWriteRequest { row_group }, - }) + let mut datums = Vec::with_capacity(schema.num_columns()); + for (index, column_schema) in schema.columns().iter().enumerate() { + let datum_view = reader.datum_view_at(index, &column_schema.data_type); + // TODO: The clone can be avoided if we can encode the final payload directly + // from the DatumView. + datums.push(datum_view.to_datum()); + } + row_group_builder.push_checked_row(Row::from_datums(datums)); + } + Ok(row_group_builder.build()) } -} -impl WriteRequest { - pub fn convert_to_pb( - request: WriteRequest, - compress_options: CompressOptions, - ) -> std::result::Result { - // Row group to pb. - let row_group = request.write_request.row_group; + pub fn convert_into_pb(self) -> Result { + let row_group = self.write_request.row_group; let table_schema = row_group.schema(); let min_timestamp = row_group.min_timestamp().as_i64(); let max_timestamp = row_group.max_timestamp().as_i64(); - let mut builder = RecordBatchWithKeyBuilder::new(table_schema.to_record_schema_with_key()); - - for row in row_group { - builder - .append_row(row) - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; + let mut encoded_rows = Vec::with_capacity(row_group.num_rows()); + // TODO: The schema of the written row group may be different from the original + // one, so the compatibility for that should be considered. + let index_in_schema = IndexInWriterSchema::for_same_schema(table_schema.num_columns()); + for row in &row_group { + let mut buf = ByteVec::new(); + let mut writer = ContiguousRowWriter::new(&mut buf, table_schema, &index_in_schema); + writer.write_row(row).with_context(|| WriteRequestToPb { + table_ident: self.table.clone(), + })?; + encoded_rows.push(buf); } - let record_batch_with_key = builder - .build() - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - let record_batch = record_batch_with_key.into_record_batch(); - let compress_output = - ipc::encode_record_batch(&record_batch.into_arrow_record_batch(), compress_options) - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - - let compression = match compress_output.method { - CompressionMethod::None => arrow_payload::Compression::None, - CompressionMethod::Zstd => arrow_payload::Compression::Zstd, + let contiguous_rows = ceresdbproto::remote_engine::ContiguousRows { + schema_version: table_schema.version(), + encoded_rows, }; - let row_group_pb = ceresdbproto::remote_engine::RowGroup { min_timestamp, max_timestamp, - rows: Some(Arrow(ArrowPayload { - record_batches: vec![compress_output.payload], - compression: compression as i32, - })), + rows: Some(Contiguous(contiguous_rows)), }; // Table ident to pb. - let table_pb = request.table.into(); + let table_pb = self.table.into(); Ok(ceresdbproto::remote_engine::WriteRequest { table: Some(table_pb), @@ -293,9 +242,7 @@ pub struct GetTableInfoRequest { impl TryFrom for GetTableInfoRequest { type Error = Error; - fn try_from( - value: ceresdbproto::remote_engine::GetTableInfoRequest, - ) -> std::result::Result { + fn try_from(value: ceresdbproto::remote_engine::GetTableInfoRequest) -> Result { let table = value.table.context(EmptyTableIdentifier)?.into(); Ok(Self { table }) } @@ -304,7 +251,7 @@ impl TryFrom for GetTableInfoR impl TryFrom for ceresdbproto::remote_engine::GetTableInfoRequest { type Error = Error; - fn try_from(value: GetTableInfoRequest) -> std::result::Result { + fn try_from(value: GetTableInfoRequest) -> Result { let table = value.table.into(); Ok(Self { table: Some(table) }) } @@ -330,43 +277,3 @@ pub struct TableInfo { /// Partition Info pub partition_info: Option, } - -fn build_row_group_from_record_batch( - record_batches: Vec, -) -> Result { - ensure!(!record_batches.is_empty(), EmptyRecordBatch); - - let mut row_group_builder = RowGroupBuilder::new( - record_batches[0] - .schema() - .try_into() - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?, - ); - - for record_batch in record_batches { - let record_batch: RecordBatch = record_batch - .try_into() - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - - let num_cols = record_batch.num_columns(); - let num_rows = record_batch.num_rows(); - for row_idx in 0..num_rows { - let mut row_builder = row_group_builder.row_builder(); - for col_idx in 0..num_cols { - let val = record_batch.column(col_idx).datum(row_idx); - row_builder = row_builder - .append_datum(val) - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - } - row_builder - .finish() - .map_err(|e| Box::new(e) as _) - .context(ConvertRowGroup)?; - } - } - - Ok(row_group_builder.build()) -}