Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use new protocol for remote engine service write #1146

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0"
ceresdbproto = "1.0.10"
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl TryFrom<&Arc<Field>> for ColumnSchema {
impl From<&ColumnSchema> for Field {
fn from(col_schema: &ColumnSchema) -> Self {
let metadata = encode_arrow_field_meta_data(col_schema);
// If the column sholud use dictionary, create correspond dictionary type.
// If the column should use dictionary, create correspond dictionary type.
let mut field = if col_schema.is_dictionary {
Field::new_dict(
&col_schema.name,
Expand Down
22 changes: 22 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,28 @@ impl<'a> DatumView<'a> {
_ => None,
}
}

pub fn to_datum(&self) -> Datum {
match self {
DatumView::Null => Datum::Null,
DatumView::Timestamp(v) => Datum::Timestamp(*v),
DatumView::Double(v) => Datum::Double(*v),
DatumView::Float(v) => Datum::Float(*v),
DatumView::Varbinary(v) => Datum::Varbinary(Bytes::from(v.to_vec())),
DatumView::String(v) => Datum::String(StringBytes::copy_from_str(v)),
DatumView::UInt64(v) => Datum::UInt64(*v),
DatumView::UInt32(v) => Datum::UInt32(*v),
DatumView::UInt16(v) => Datum::UInt16(*v),
DatumView::UInt8(v) => Datum::UInt8(*v),
DatumView::Int64(v) => Datum::Int64(*v),
DatumView::Int32(v) => Datum::Int32(*v),
DatumView::Int16(v) => Datum::Int16(*v),
DatumView::Int8(v) => Datum::Int8(*v),
DatumView::Boolean(v) => Datum::Boolean(*v),
DatumView::Date(v) => Datum::Date(*v),
DatumView::Time(v) => Datum::Time(*v),
}
}
}

impl<'a> std::hash::Hash for DatumView<'a> {
Expand Down
20 changes: 6 additions & 14 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,9 @@ impl Client {

// Write to remote.
let table_ident = request.table.clone();
let request_pb = WriteRequest::convert_to_pb(request, self.compression)
.box_err()
.context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
let request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "Failed to convert WriteRequest to pb",
})?;
let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(route_context.channel);

let result = rpc_client
Expand Down Expand Up @@ -184,16 +182,10 @@ impl Client {
request,
channel,
} = context;
let compress_options = self.compression;
let batch_request_pb = request.convert_into_pb().box_err().context(Convert {
msg: "failed to convert request to pb",
})?;
let handle = self.io_runtime.spawn(async move {
let batch_request_pb =
match WriteBatchRequest::convert_write_batch_to_pb(request, compress_options)
.box_err()
{
Ok(pb) => pb,
Err(e) => return Err(e),
};

let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);
let rpc_result = rpc_client
.write_batch(Request::new(batch_request_pb))
Expand Down
10 changes: 8 additions & 2 deletions server/src/grpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

use lazy_static::lazy_static;
use prometheus::{
exponential_buckets, register_histogram_vec, register_int_counter_vec, HistogramVec,
IntCounterVec,
exponential_buckets, register_histogram, register_histogram_vec, register_int_counter_vec,
Histogram, HistogramVec, IntCounterVec,
};
use prometheus_static_metric::{auto_flush_from, make_auto_flush_static_metric};

Expand Down Expand Up @@ -102,6 +102,12 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM: Histogram = register_histogram!(
"remote_engine_write_batch_num_rows",
"Bucketed histogram of grpc server handler",
vec![1.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 2000.0]
)
.unwrap();
pub static ref META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC_GLOBAL: HistogramVec =
register_histogram_vec!(
"meta_event_grpc_handler_duration",
Expand Down
55 changes: 48 additions & 7 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use ceresdbproto::{
remote_engine::{
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService,
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService, row_group,
GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteBatchRequest,
WriteRequest, WriteResponse,
},
Expand All @@ -38,14 +38,18 @@ use proxy::{
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::EngineRuntimes, predicate::PredicateRef, remote::model::TableIdentifier,
stream::PartitionedStreams, table::TableRef,
engine::EngineRuntimes,
predicate::PredicateRef,
remote::model::{self, TableIdentifier},
stream::PartitionedStreams,
table::TableRef,
};
use time_ext::InstantExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use super::metrics::REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM;
use crate::{
dedup_requests::{RequestNotifiers, RequestResult},
grpc::{
Expand Down Expand Up @@ -604,19 +608,56 @@ async fn record_write(
}

async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result<WriteResponse> {
let write_request: table_engine::remote::model::WriteRequest =
request.try_into().box_err().context(ErrWithCause {
let table_ident: TableIdentifier = request
.table
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "missing table ident",
})?
.into();

let rows_payload = request
.row_group
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "missing row group payload",
})?
.rows
.context(ErrNoCause {
code: StatusCode::BadRequest,
msg: "fail to convert write request",
msg: "missing rows payload",
})?;

let table = find_table_by_identifier(&ctx, &table_ident)?;
let write_request = match rows_payload {
row_group::Rows::Arrow(_) => {
// The payload encoded in arrow format won't be accept any more.
return ErrNoCause {
code: StatusCode::BadRequest,
msg: "payload encoded in arrow format is not supported anymore",
}
.fail();
}
row_group::Rows::Contiguous(payload) => {
let schema = table.schema();
let row_group =
jiacai2050 marked this conversation as resolved.
Show resolved Hide resolved
model::WriteRequest::decode_row_group_from_contiguous_payload(payload, &schema)
.box_err()
.context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "failed to decode row group payload",
})?;
model::WriteRequest::new(table_ident, row_group)
}
};

// In theory we should record write request we at the beginning of server's
// handle, but the payload is encoded, so we cannot record until decode payload
// here.
record_write(&ctx.hotspot_recorder, &write_request).await;

let num_rows = write_request.write_request.row_group.num_rows();
let table = find_table_by_identifier(&ctx, &write_request.table)?;
REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM.observe(num_rows as f64);

let res = table
.write(write_request.write_request)
Expand Down
Loading
Loading