Skip to content

Commit

Permalink
feat: impl remote_engine grpc service (apache#508)
Browse files Browse the repository at this point in the history
* feat: impl remote_engine grpc service

* chore: refactor code

* chore: refactor code

* refactor by CR

* refactor by CR

* make CI happy
  • Loading branch information
chunshao90 authored Dec 27, 2022
1 parent 97f7816 commit cbed6a7
Show file tree
Hide file tree
Showing 16 changed files with 757 additions and 48 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::{fmt, sync::Arc};

use snafu::{ensure, Backtrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::{
column_schema::{ColumnSchema, ReadOp},
Expand Down Expand Up @@ -36,6 +36,14 @@ pub enum Error {
backtrace
))]
MissingReadColumn { name: String, backtrace: Backtrace },

#[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
EmptyTableSchema { backtrace: Backtrace },

#[snafu(display("Failed to covert table schema, err:{}", source))]
ConvertTableSchema {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -147,6 +155,24 @@ impl ProjectedSchema {
}
}

impl TryFrom<proto::common::ProjectedSchema> for ProjectedSchema {
type Error = Error;

fn try_from(pb: proto::common::ProjectedSchema) -> std::result::Result<Self, Self::Error> {
let schema: Schema = pb
.table_schema
.context(EmptyTableSchema)?
.try_into()
.map_err(|e| Box::new(e) as _)
.context(ConvertTableSchema)?;
let projection = pb
.projection
.map(|v| v.idx.into_iter().map(|id| id as usize).collect());

ProjectedSchema::new(schema, projection)
}
}

/// Schema with projection informations
struct ProjectedSchemaInner {
/// The schema before projection that the reader intended to read, may
Expand Down
99 changes: 95 additions & 4 deletions server/src/avro_util.rs → common_util/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use std::collections::HashMap;
use avro_rs::{
schema::{Name, RecordField, RecordFieldOrder},
types::{Record, Value},
Schema as AvroSchema,
};
use common_types::{
bytes::ByteVec,
bytes::{ByteVec, Bytes},
column::ColumnBlock,
datum::{Datum, DatumKind},
record_batch::RecordBatch,
schema::RecordSchema,
row::{Row, RowGroup, RowGroupBuilder},
schema::{RecordSchema, Schema},
string::StringBytes,
time::Timestamp,
};
use common_util::define_result;
use snafu::{Backtrace, ResultExt, Snafu};

/// Schema name of the record
const RECORD_NAME: &str = "Result";

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand All @@ -29,6 +35,21 @@ pub enum Error {
source: avro_rs::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert to avro record, err:{}", source))]
ConvertToAvroRecord {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}",
value,
backtrace
))]
InvalidAvroRecord { value: Value, backtrace: Backtrace },

#[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))]
UnsupportedType { value: Value, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -85,6 +106,25 @@ pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema {
}
}

pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result<Vec<ByteVec>> {
let mut rows = Vec::new();
let avro_schema = to_avro_schema(RECORD_NAME, record_batch.schema());
record_batch_to_avro(record_batch, &avro_schema, &mut rows)?;
Ok(rows)
}

pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGroup> {
let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema());
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len());
for raw_row in rows {
let mut row = Vec::with_capacity(schema.num_columns());
avro_row_to_row(&avro_schema, raw_row, &mut row)?;
builder.push_checked_row(Row::from_datums(row));
}

Ok(builder.build())
}

fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
match data_type {
DatumKind::Null => avro_rs::Schema::Null,
Expand All @@ -104,7 +144,7 @@ fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
}

/// Convert record batch to avro format
pub fn record_batch_to_avro(
fn record_batch_to_avro(
record_batch: &RecordBatch,
schema: &avro_rs::Schema,
rows: &mut Vec<ByteVec>,
Expand Down Expand Up @@ -158,6 +198,41 @@ fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Va
}
}

/// Convert the avro `Value` into the `Datum`.
///
/// Some types defined by avro are not used and the conversion rule is totally
/// based on the implementation in the server.
fn avro_value_to_datum(value: Value) -> Result<Datum> {
let datum = match value {
Value::Null => Datum::Null,
Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)),
Value::Double(v) => Datum::Double(v),
Value::Float(v) => Datum::Float(v),
Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)),
Value::String(v) => Datum::String(StringBytes::from(v)),
// FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is
// not supported by avro, that is to say something may go wrong in some corner case.
Value::Long(v) => Datum::Int64(v),
Value::Int(v) => Datum::Int32(v),
Value::Boolean(v) => Datum::Boolean(v),
Value::Union(inner_val) => avro_value_to_datum(*inner_val)?,
Value::Fixed(_, _)
| Value::Enum(_, _)
| Value::Array(_)
| Value::Map(_)
| Value::Record(_)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_) => return UnsupportedType { value }.fail(),
};

Ok(datum)
}

#[inline]
fn may_union(val: Value, is_nullable: bool) -> Value {
if is_nullable {
Expand All @@ -166,3 +241,19 @@ fn may_union(val: Value, is_nullable: bool) -> Value {
val
}
}

fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec<Datum>) -> Result<()> {
let record = avro_rs::from_avro_datum(schema, &mut raw, None)
.map_err(|e| Box::new(e) as _)
.context(ConvertToAvroRecord)?;
if let Value::Record(cols) = record {
for (_, column_value) in cols {
let datum = avro_value_to_datum(column_value)?;
row.push(datum);
}

Ok(())
} else {
InvalidAvroRecord { value: record }.fail()
}
}
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod avro;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
10 changes: 10 additions & 0 deletions proto/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ message TimeRange {
// exclusive end
int64 end = 2;
}

// Projected Schema
message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Projection {
repeated uint64 idx = 1;
}
23 changes: 9 additions & 14 deletions proto/protos/remote_engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ message ReadOptions {
uint64 read_parallelism = 2;
}

message Projection {
repeated uint64 idx = 1;
}

message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Predicate {
repeated bytes exprs = 1;
common.TimeRange time_range = 2;
Expand All @@ -49,7 +40,7 @@ enum ReadOrder {
message TableReadRequest {
uint64 request_id = 1;
ReadOptions opts = 2;
ProjectedSchema projected_schema = 3;
common.ProjectedSchema projected_schema = 3;
Predicate predicate = 4;
ReadOrder order = 5;
}
Expand All @@ -61,14 +52,18 @@ message ReadRequest {

message ReadResponse {
ResponseHeader header = 1;
repeated bytes rows = 2;
// Version of row encoding method
uint32 version = 2;
repeated bytes rows = 3;
}

message RowGroup {
// Version of row encoding method
common.TableSchema table_schema = 1;
repeated bytes rows = 2;
int64 min_timestamp = 3;
int64 max_timestamp = 4;
int64 min_timestamp = 2;
int64 max_timestamp = 3;
uint32 version = 4;
repeated bytes rows = 5;
}

message WriteRequest {
Expand Down
18 changes: 17 additions & 1 deletion server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_util::{
};
use futures::FutureExt;
use log::{info, warn};
use proto::remote_engine::remote_engine_service_server::RemoteEngineServiceServer;
use query_engine::executor::Executor as QueryExecutor;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::engine::EngineRuntimes;
Expand All @@ -35,7 +36,7 @@ use crate::{
config::Endpoint,
grpc::{
forward::Forwarder, meta_event_service::MetaServiceImpl,
storage_service::StorageServiceImpl,
remote_engine_service::RemoteEngineServiceImpl, storage_service::StorageServiceImpl,
},
instance::InstanceRef,
route::RouterRef,
Expand All @@ -45,6 +46,7 @@ use crate::{
pub mod forward;
mod meta_event_service;
mod metrics;
mod remote_engine_service;
mod storage_service;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -146,6 +148,7 @@ pub struct RpcServices<Q: QueryExecutor + 'static> {
serve_addr: SocketAddr,
rpc_server: StorageServiceServer<StorageServiceImpl<Q>>,
meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl<Q>>>,
remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl<Q>>,
runtime: Arc<Runtime>,
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
Expand All @@ -155,6 +158,7 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
pub async fn start(&mut self) -> Result<()> {
let rpc_server = self.rpc_server.clone();
let meta_rpc_server = self.meta_rpc_server.clone();
let remote_engine_server = self.remote_engine_server.clone();
let serve_addr = self.serve_addr;
let (stop_tx, stop_rx) = oneshot::channel();
let join_handle = self.runtime.spawn(async move {
Expand All @@ -167,6 +171,9 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
router = router.add_service(s);
};

info!("Grpc server serves remote engine rpc service");
router = router.add_service(remote_engine_server);

let serve_res = router
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
.await;
Expand Down Expand Up @@ -277,6 +284,14 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
MetaEventServiceServer::new(meta_service)
});

let remote_engine_server = {
let service = RemoteEngineServiceImpl {
instance: instance.clone(),
runtimes: runtimes.clone(),
};
RemoteEngineServiceServer::new(service)
};

let forward_config = self.forward_config.unwrap_or_default();
let forwarder = if forward_config.enable {
let local_endpoint =
Expand Down Expand Up @@ -306,6 +321,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
serve_addr,
rpc_server,
meta_rpc_server,
remote_engine_server,
runtime: bg_runtime,
stop_tx: None,
join_handle: None,
Expand Down
Loading

0 comments on commit cbed6a7

Please sign in to comment.