Skip to content

Commit

Permalink
modify avro util.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Dec 26, 2022
1 parent 6139d89 commit b4f42a3
Show file tree
Hide file tree
Showing 19 changed files with 708 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ tonic = "0.8.1"
tokio = { version = "1.21", features = ["full"] }
wal = { path = "wal" }
message_queue = { path = "components/message_queue" }
clru = "0.6.1"
avro-rs = "0.13"

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
Expand Down
18 changes: 18 additions & 0 deletions common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ impl ProjectedSchema {
}
}

impl From<ProjectedSchema> for proto::remote_engine::ProjectedSchema {
fn from(request: ProjectedSchema) -> Self {
let table_schema_pb = (&request.0.original_schema).into();
let projection_pb = request.0.projection.as_ref().map(|project| {
let project = project
.iter()
.map(|one_project| *one_project as u64)
.collect::<Vec<u64>>();
proto::remote_engine::Projection { idx: project }
});

Self {
table_schema: Some(table_schema_pb),
projection: projection_pb,
}
}
}

/// Schema with projection informations
struct ProjectedSchemaInner {
/// The schema before projection that the reader intended to read, may
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/request_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

#[derive(Debug, Clone, Copy)]
pub struct RequestId(u64);
pub struct RequestId(pub u64);

impl RequestId {
/// Acquire next request id.
Expand Down
2 changes: 2 additions & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ test = ["env_logger"]
[dependencies]
# In alphabetical order
arrow = { workspace = true }
avro-rs = { workspace = true }
backtrace = "0.3.9"
chrono = { workspace = true }
common_types = { workspace = true, features = ["test"] }
Expand All @@ -30,6 +31,7 @@ prometheus = { workspace = true }
proto = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
time = "0.1"
tokio = { workspace = true }
Expand Down
81 changes: 79 additions & 2 deletions server/src/avro_util.rs → common_util/src/avro_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ use avro_rs::{
types::{Record, Value},
};
use common_types::{
bytes::ByteVec,
bytes::{ByteVec, Bytes},
column::ColumnBlock,
column_schema::ColumnSchema,
datum::{Datum, DatumKind},
record_batch::RecordBatch,
row::Row,
schema::RecordSchema,
string::StringBytes,
time::Timestamp,
};
use common_util::define_result;
use snafu::{Backtrace, ResultExt, Snafu};

use crate::define_result;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand All @@ -29,13 +34,27 @@ pub enum Error {
source: avro_rs::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert avro raw to row, err:{}.", source,))]
RawToRowWithCause { source: avro_rs::Error },

#[snafu(display(
"Failed to convert avro raw to row, msg:{}.\nBacktrace:\n{}",
msg,
backtrace
))]
RawToRowNoCause { msg: String, backtrace: Backtrace },
}

define_result!(Error);

/// Create [avro_rs::Schema] with given `name` from [RecordSchema]
pub fn to_avro_schema(name: &str, schema: &RecordSchema) -> avro_rs::Schema {
let columns = schema.columns();
columns_to_avro_schema(name, columns)
}

pub fn columns_to_avro_schema(name: &str, columns: &[ColumnSchema]) -> avro_rs::Schema {
let mut lookup = HashMap::with_capacity(columns.len());
let mut avro_fields = Vec::with_capacity(columns.len());

Expand Down Expand Up @@ -138,6 +157,10 @@ pub fn record_batch_to_avro(
/// Panic if row_idx is out of bound.
fn column_to_value(array: &ColumnBlock, row_idx: usize, is_nullable: bool) -> Value {
let datum = array.datum(row_idx);
datum_to_value(datum, is_nullable)
}

pub fn datum_to_value(datum: Datum, is_nullable: bool) -> Value {
match datum {
Datum::Null => may_union(Value::Null, is_nullable),
Datum::Timestamp(v) => may_union(Value::TimestampMillis(v.as_i64()), is_nullable),
Expand Down Expand Up @@ -166,3 +189,57 @@ fn may_union(val: Value, is_nullable: bool) -> Value {
val
}
}

pub fn raw_to_row(schema: &avro_rs::Schema, mut raw: &[u8], row: &mut Vec<Datum>) -> Result<()> {
let record = avro_rs::from_avro_datum(schema, &mut raw, None).context(RawToRowWithCause)?;

if let Value::Record(cols) = record {
for (_, column_value) in cols {
let datum = value_to_datum(column_value)?;
row.push(datum);
}

Ok(())
} else {
return RawToRowNoCause {
msg: "invalid avro record",
}
.fail();
}
}

fn value_to_datum(value: Value) -> Result<Datum> {
let datum = match value {
Value::Null => Datum::Null,
Value::TimestampMillis(v) => Datum::Timestamp(Timestamp::new(v)),
Value::Double(v) => Datum::Double(v),
Value::Float(v) => Datum::Float(v),
Value::Bytes(v) => Datum::Varbinary(Bytes::from(v)),
Value::String(v) => Datum::String(StringBytes::from(v)),
// FIXME: Now the server converts both uint64 and int64 into`Value::Long` because uint64 is
// not supported by avro, that is to say something may go wrong in some corner case.
Value::Long(v) => Datum::Int64(v),
Value::Int(v) => Datum::Int32(v),
Value::Boolean(v) => Datum::Boolean(v),
Value::Union(inner_val) => value_to_datum(*inner_val)?,
Value::Fixed(_, _)
| Value::Enum(_, _)
| Value::Array(_)
| Value::Map(_)
| Value::Record(_)
| Value::Date(_)
| Value::Decimal(_)
| Value::TimeMillis(_)
| Value::TimeMicros(_)
| Value::TimestampMicros(_)
| Value::Duration(_)
| Value::Uuid(_) => {
return RawToRowNoCause {
msg: "invalid avro value type",
}
.fail()
}
};

Ok(datum)
}
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod avro_util;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
2 changes: 1 addition & 1 deletion components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clru = "0.6.1"
clru = { workspace = true }
common_util = { workspace = true }
crc = "3.0.0"
futures = { workspace = true }
Expand Down
11 changes: 4 additions & 7 deletions server/src/grpc/storage_service/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ceresdbproto::{
storage::{query_response, QueryRequest, QueryResponse},
};
use common_types::{record_batch::RecordBatch, request_id::RequestId};
use common_util::time::InstantExt;
use common_util::{avro_util, time::InstantExt};
use http::StatusCode;
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
use log::info;
Expand All @@ -20,12 +20,9 @@ use sql::{
provider::CatalogMetaProvider,
};

use crate::{
avro_util,
grpc::storage_service::{
error::{ErrNoCause, ErrWithCause, Result},
HandlerContext,
},
use crate::grpc::storage_service::{
error::{ErrNoCause, ErrWithCause, Result},
HandlerContext,
};

/// Schema name of the record
Expand Down
1 change: 0 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#[macro_use]
extern crate common_util;

mod avro_util;
pub mod config;
mod consts;
mod context;
Expand Down
4 changes: 4 additions & 0 deletions table_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ workspace = true
# In alphabetical order
arrow = { workspace = true }
async-trait = { workspace = true }
avro-rs = { workspace = true }
clru = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-proto = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
parquet = { workspace = true }
Expand All @@ -29,3 +32,4 @@ serde_derive = { workspace = true }
smallvec = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
37 changes: 35 additions & 2 deletions table_engine/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,23 @@ use datafusion::{
logical_plan::{Expr, Operator},
scalar::ScalarValue,
};
use datafusion_proto::{self, bytes::Serializeable};
use log::debug;
use snafu::Snafu;
use snafu::{ResultExt, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
pub enum Error {
#[snafu(display("Failed ot do pruning, err:{}", source))]
#[snafu(display("Failed to do pruning, err:{}", source))]
Prune {
source: datafusion::error::DataFusionError,
},

#[snafu(display("Failed to convert predicate to pb, msg:{}, err:{}", msg, source))]
PredicateToPb {
msg: String,
source: datafusion::error::DataFusionError,
},
}

define_result!(Error);
Expand Down Expand Up @@ -69,6 +76,32 @@ impl Predicate {
}
}

impl TryFrom<&Predicate> for proto::remote_engine::Predicate {
type Error = Error;

fn try_from(predicate: &Predicate) -> std::result::Result<Self, Self::Error> {
let time_range = predicate.time_range;
let mut exprs = Vec::with_capacity(predicate.exprs.len());
for expr in &predicate.exprs {
let expr = expr
.to_bytes()
.context(PredicateToPb {
msg: format!("convert expr failed, expr:{}", expr),
})?
.to_vec();
exprs.push(expr);
}

Ok(Self {
exprs,
time_range: Some(proto::common::TimeRange {
start: time_range.inclusive_start().as_i64(),
end: time_range.exclusive_end().as_i64(),
}),
})
}
}

/// Builder for [Predicate]
#[derive(Debug, Clone, Default)]
#[must_use]
Expand Down
29 changes: 27 additions & 2 deletions table_engine/src/remote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,41 @@
//! Remote table engine
pub mod model;
pub mod remote_impl;

use async_trait::async_trait;
use common_util::define_result;
use model::{ReadRequest, WriteRequest};
use snafu::Snafu;
use snafu::{Backtrace, Snafu};

use crate::stream::SendableRecordBatchStream;

#[derive(Debug, Snafu)]
pub enum Error {}
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to convert read request to pb, err:{}", source))]
ReadRequestToPb { source: crate::table::Error },

#[snafu(display(
"Failed to convert write request to pb, msg:{}.\nBacktrace:\n{}",
msg,
backtrace
))]
WriteRequestToPbNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to convert write request to pb, msg:{}, err:{}", msg, source))]
WriteRequestToPbWithCause {
msg: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
// #[snafu(display("Failed to covert table read request, err:{}", source))]
// ConvertTableReadRequest {
// source: Box<dyn std::error::Error + Send + Sync>,
// },

// #[snafu(display("Empty row group.\nBacktrace:\n{}", backtrace))]
// EmptyRowGroup { backtrace: Backtrace },
}

define_result!(Error);

Expand Down
Loading

0 comments on commit b4f42a3

Please sign in to comment.