Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl remote_engine grpc service #508

Merged
merged 8 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::{fmt, sync::Arc};

use snafu::{ensure, Backtrace, ResultExt, Snafu};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::{
column_schema::{ColumnSchema, ReadOp},
Expand Down Expand Up @@ -36,6 +36,14 @@ pub enum Error {
backtrace
))]
MissingReadColumn { name: String, backtrace: Backtrace },

#[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))]
EmptyTableSchema { backtrace: Backtrace },

#[snafu(display("Failed to covert table schema, err:{}", source))]
ConvertTableSchema {
source: Box<dyn std::error::Error + Send + Sync>,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -147,6 +155,24 @@ impl ProjectedSchema {
}
}

impl TryFrom<proto::common::ProjectedSchema> for ProjectedSchema {
type Error = Error;

fn try_from(pb: proto::common::ProjectedSchema) -> std::result::Result<Self, Self::Error> {
let schema: Schema = pb
.table_schema
.context(EmptyTableSchema)?
.try_into()
.map_err(|e| Box::new(e) as _)
.context(ConvertTableSchema)?;
let projection = pb
.projection
.map(|v| v.idx.into_iter().map(|id| id as usize).collect());

ProjectedSchema::new(schema, projection)
}
}

/// Schema with projection informations
struct ProjectedSchemaInner {
/// The schema before projection that the reader intended to read, may
Expand Down
2 changes: 2 additions & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ test = ["env_logger"]
[dependencies]
# In alphabetical order
arrow = { workspace = true }
avro-rs = "0.13"
backtrace = "0.3.9"
chrono = { workspace = true }
common_types = { workspace = true, features = ["test"] }
Expand All @@ -30,6 +31,7 @@ prometheus = { workspace = true }
proto = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
time = "0.1"
tokio = { workspace = true }
Expand Down
99 changes: 95 additions & 4 deletions server/src/avro_util.rs → common_util/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ use std::collections::HashMap;
use avro_rs::{
schema::{Name, RecordField, RecordFieldOrder},
types::{Record, Value},
Schema as AvroSchema,
};
use common_types::{
bytes::ByteVec,
bytes::{ByteVec, Bytes},
column::ColumnBlock,
datum::{Datum, DatumKind},
record_batch::RecordBatch,
schema::RecordSchema,
row::{Row, RowGroup, RowGroupBuilder},
schema::{RecordSchema, Schema},
string::StringBytes,
time::Timestamp,
};
use common_util::define_result;
use snafu::{Backtrace, ResultExt, Snafu};

/// Schema name of the record
const RECORD_NAME: &str = "Result";

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand All @@ -29,6 +35,21 @@ pub enum Error {
source: avro_rs::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert to avro record, err:{}", source))]
ConvertToAvroRecord {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display(
"Invalid avro record, expect record, value:{:?}.\nBacktrace:\n{}",
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
value,
backtrace
))]
InvalidAvroRecord { value: Value, backtrace: Backtrace },

#[snafu(display("Unsupported arvo type, value:{:?}.\nBacktrace:\n{}", value, backtrace))]
UnsupportedType { value: Value, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -85,6 +106,25 @@ pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema {
}
}

pub fn record_batch_to_avro_rows(record_batch: &RecordBatch) -> Result<Vec<ByteVec>> {
let mut rows = Vec::new();
let avro_schema = to_avro_schema(RECORD_NAME, record_batch.schema());
record_batch_to_avro(record_batch, &avro_schema, &mut rows)?;
Ok(rows)
}

pub fn avro_rows_to_row_group(schema: Schema, rows: &[Vec<u8>]) -> Result<RowGroup> {
let avro_schema = to_avro_schema(RECORD_NAME, &schema.to_record_schema());
let mut builder = RowGroupBuilder::with_capacity(schema.clone(), rows.len());
for raw_row in rows {
let mut row = Vec::with_capacity(schema.num_columns());
avro_row_to_row(&avro_schema, raw_row, &mut row)?;
builder.push_checked_row(Row::from_datums(row));
}

Ok(builder.build())
}

fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
match data_type {
DatumKind::Null => avro_rs::Schema::Null,
Expand All @@ -104,7 +144,7 @@ fn data_type_to_schema(data_type: &DatumKind) -> avro_rs::Schema {
}

/// Convert record batch to avro format
pub fn record_batch_to_avro(
fn record_batch_to_avro(
record_batch: &RecordBatch,
schema: &avro_rs::Schema,
rows: &mut Vec<ByteVec>,
Expand Down Expand Up @@ -158,6 +198,41 @@ fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Va
}
}

/// Convert the avro `Value` into the `Datum`.
///
/// Some types defined by avro are not used and the conversion rule is totally
/// based on the implementation in the server.
fn avro_value_to_datum(value: Value) -> Result<Datum> {
let datum = match value {
Value::Null => Datum::Null,
Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)),
Value::Double(v) => Datum::Double(v),
Value::Float(v) => Datum::Float(v),
Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)),
Value::String(v) => Datum::String(StringBytes::from(v)),
// FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is
// not supported by avro, that is to say something may go wrong in some corner case.
Value::Long(v) => Datum::Int64(v),
Value::Int(v) => Datum::Int32(v),
Value::Boolean(v) => Datum::Boolean(v),
Value::Union(inner_val) => avro_value_to_datum(*inner_val)?,
Value::Fixed(_, _)
| Value::Enum(_, _)
| Value::Array(_)
| Value::Map(_)
| Value::Record(_)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_) => return UnsupportedType { value }.fail(),
};

Ok(datum)
}

#[inline]
fn may_union(val: Value, is_nullable: bool) -> Value {
if is_nullable {
Expand All @@ -166,3 +241,19 @@ fn may_union(val: Value, is_nullable: bool) -> Value {
val
}
}

fn avro_row_to_row(schema: &AvroSchema, mut raw: &[u8], row: &mut Vec<Datum>) -> Result<()> {
let record = avro_rs::from_avro_datum(schema, &mut raw, None)
.map_err(|e| Box::new(e) as _)
.context(ConvertToAvroRecord)?;
if let Value::Record(cols) = record {
for (_, column_value) in cols {
let datum = avro_value_to_datum(column_value)?;
row.push(datum);
}

Ok(())
} else {
InvalidAvroRecord { value: record }.fail()
}
}
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod avro;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
10 changes: 10 additions & 0 deletions proto/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ message TimeRange {
// exclusive end
int64 end = 2;
}

// Projected Schema
message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Projection {
repeated uint64 idx = 1;
}
23 changes: 9 additions & 14 deletions proto/protos/remote_engine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ message ReadOptions {
uint64 read_parallelism = 2;
}

message Projection {
repeated uint64 idx = 1;
}

message ProjectedSchema {
common.TableSchema table_schema = 1;
Projection projection = 2;
}

message Predicate {
repeated bytes exprs = 1;
common.TimeRange time_range = 2;
Expand All @@ -49,7 +40,7 @@ enum ReadOrder {
message TableReadRequest {
uint64 request_id = 1;
ReadOptions opts = 2;
ProjectedSchema projected_schema = 3;
common.ProjectedSchema projected_schema = 3;
Predicate predicate = 4;
ReadOrder order = 5;
}
Expand All @@ -61,14 +52,18 @@ message ReadRequest {

message ReadResponse {
ResponseHeader header = 1;
repeated bytes rows = 2;
// Version of row encoding method
uint32 version = 2;
repeated bytes rows = 3;
}

message RowGroup {
// Version of row encoding method
common.TableSchema table_schema = 1;
repeated bytes rows = 2;
int64 min_timestamp = 3;
int64 max_timestamp = 4;
int64 min_timestamp = 2;
int64 max_timestamp = 3;
uint32 version = 4;
repeated bytes rows = 5;
}

message WriteRequest {
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ workspace = true
analytic_engine = { workspace = true }
arrow = { workspace = true }
async-trait = { workspace = true }
avro-rs = "0.13"
bytes = { workspace = true }
catalog = { workspace = true }
ceresdbproto = { workspace = true }
Expand All @@ -36,6 +35,7 @@ profile = { workspace = true }
prometheus = { workspace = true }
prometheus-static-metric = { workspace = true }
prost = { workspace = true }
proto = { workspace = true }
query_engine = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
Expand Down
Loading