Skip to content

Commit

Permalink
[meta] feature: protobuf message has to persist MIN_COMPATIBLE_VER โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆin it

This way to let old query be able to decide if it is safe to load data
written by a newer query executable.

- Fix: #5784
  • Loading branch information
drmingdrmer committed Jun 5, 2022
1 parent 25531a7 commit 80e09f1
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 46 deletions.
7 changes: 5 additions & 2 deletions common/proto-conv/src/config_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use common_protos::pb;
use crate::check_ver;
use crate::FromToProto;
use crate::Incompatible;
use crate::MIN_COMPATIBLE_VER;
use crate::VER;

impl FromToProto<pb::S3StorageConfig> for StorageParams {
fn from_pb(p: pb::S3StorageConfig) -> Result<Self, Incompatible>
where Self: Sized {
// TODO: config will have it's own version flags in the future.
check_ver(p.version)?;
check_ver(p.version, p.min_compatible)?;

Ok(Self::S3(StorageS3Config {
region: p.region,
Expand All @@ -43,6 +44,7 @@ impl FromToProto<pb::S3StorageConfig> for StorageParams {
if let StorageParams::S3(v) = self {
Ok(pb::S3StorageConfig {
version: VER,
min_compatible: MIN_COMPATIBLE_VER,
region: v.region.clone(),
endpoint_url: v.endpoint_url.clone(),
access_key_id: v.access_key_id.clone(),
Expand All @@ -63,7 +65,7 @@ impl FromToProto<pb::FsStorageConfig> for StorageParams {
fn from_pb(p: pb::FsStorageConfig) -> Result<Self, Incompatible>
where Self: Sized {
// TODO: config will have it's own version flags in the future.
check_ver(p.version)?;
check_ver(p.version, p.min_compatible)?;

Ok(Self::Fs(StorageFsConfig { root: p.root }))
}
Expand All @@ -72,6 +74,7 @@ impl FromToProto<pb::FsStorageConfig> for StorageParams {
if let StorageParams::Fs(v) = self {
Ok(pb::FsStorageConfig {
version: VER,
min_compatible: MIN_COMPATIBLE_VER,
root: v.root.clone(),
})
} else {
Expand Down
66 changes: 52 additions & 14 deletions common/proto-conv/src/data_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use num::FromPrimitive;
use crate::check_ver;
use crate::FromToProto;
use crate::Incompatible;
use crate::MIN_COMPATIBLE_VER;
use crate::VER;

impl FromToProto<pb::DataSchema> for dv::DataSchema {
fn from_pb(p: pb::DataSchema) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let mut fs = Vec::with_capacity(p.fields.len());
for f in p.fields.into_iter() {
Expand All @@ -50,6 +51,7 @@ impl FromToProto<pb::DataSchema> for dv::DataSchema {

let p = pb::DataSchema {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
fields: fs,
metadata: self.meta().clone(),
};
Expand All @@ -59,7 +61,7 @@ impl FromToProto<pb::DataSchema> for dv::DataSchema {

impl FromToProto<pb::DataField> for dv::DataField {
fn from_pb(p: pb::DataField) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let v = dv::DataField::new(
&p.name,
Expand All @@ -74,6 +76,7 @@ impl FromToProto<pb::DataField> for dv::DataField {
fn to_pb(&self) -> Result<pb::DataField, Incompatible> {
let p = pb::DataField {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
name: self.name().clone(),
default_expr: self.default_expr().cloned(),
data_type: Some(self.data_type().to_pb()?),
Expand All @@ -84,7 +87,7 @@ impl FromToProto<pb::DataField> for dv::DataField {

impl FromToProto<pb::DataType> for dv::DataTypeImpl {
fn from_pb(p: pb::DataType) -> Result<Self, Incompatible> {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let dt = match p.dt {
None => {
Expand Down Expand Up @@ -136,90 +139,103 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::NullableType(Box::new(inn))),
};
Ok(v)
}
dv::DataTypeImpl::Boolean(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::BoolType(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int8(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int8Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int16(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int16Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Int64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Int64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt8(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint8Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt16(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint16Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::UInt64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Uint64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Float32(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Float32Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Float64(_) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::Float64Type(pb::Empty {})),
};
Ok(v)
}
dv::DataTypeImpl::Date(_x) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::DateType(pb::Empty {})),
};
Ok(v)
Expand All @@ -229,13 +245,15 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::TimestampType(inn)),
};
Ok(v)
}
dv::DataTypeImpl::String(_x) => {
let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::StringType(pb::Empty {})),
};
Ok(v)
Expand All @@ -245,6 +263,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::StructType(inn)),
};
Ok(v)
Expand All @@ -254,6 +273,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let v = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::ArrayType(Box::new(inn))),
};
Ok(v)
Expand All @@ -263,6 +283,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantType(inn)),
};
Ok(p)
Expand All @@ -272,6 +293,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantArrayType(inn)),
};
Ok(p)
Expand All @@ -281,6 +303,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::VariantObjectType(inn)),
};
Ok(p)
Expand All @@ -290,6 +313,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {

let p = pb::DataType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
dt: Some(Dt::IntervalType(inn)),
};
Ok(p)
Expand All @@ -301,7 +325,7 @@ impl FromToProto<pb::DataType> for dv::DataTypeImpl {
impl FromToProto<pb::NullableType> for dv::NullableType {
fn from_pb(p: pb::NullableType) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let inner = p.inner.ok_or_else(|| Incompatible {
reason: "NullableType.inner can not be None".to_string(),
Expand All @@ -318,6 +342,7 @@ impl FromToProto<pb::NullableType> for dv::NullableType {

let p = pb::NullableType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
inner: Some(Box::new(inner_pb_type)),
};

Expand All @@ -328,14 +353,15 @@ impl FromToProto<pb::NullableType> for dv::NullableType {
impl FromToProto<pb::Timestamp> for dv::TimestampType {
fn from_pb(p: pb::Timestamp) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;
let v = dv::TimestampType::create(p.precision as usize);
Ok(v)
}

fn to_pb(&self) -> Result<pb::Timestamp, Incompatible> {
let p = pb::Timestamp {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
precision: self.precision() as u64,
// tz: self.tz().cloned(),
};
Expand All @@ -347,7 +373,7 @@ impl FromToProto<pb::Timestamp> for dv::TimestampType {
impl FromToProto<pb::Struct> for dv::StructType {
fn from_pb(p: pb::Struct) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;
let names = p.names.clone();

let mut types = Vec::with_capacity(p.types.len());
Expand All @@ -369,6 +395,7 @@ impl FromToProto<pb::Struct> for dv::StructType {

let p = pb::Struct {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,

names,
types,
Expand All @@ -381,7 +408,7 @@ impl FromToProto<pb::Struct> for dv::StructType {
impl FromToProto<pb::Array> for dv::ArrayType {
fn from_pb(p: pb::Array) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let inner = p.inner.ok_or_else(|| Incompatible {
reason: "Array.inner can not be None".to_string(),
Expand All @@ -398,6 +425,7 @@ impl FromToProto<pb::Array> for dv::ArrayType {

let p = pb::Array {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
inner: Some(Box::new(inner_pb_type)),
};

Expand All @@ -408,27 +436,33 @@ impl FromToProto<pb::Array> for dv::ArrayType {
impl FromToProto<pb::VariantArray> for dv::VariantArrayType {
fn from_pb(p: pb::VariantArray) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::VariantArray, Incompatible> {
let p = pb::VariantArray { ver: VER };
let p = pb::VariantArray {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}

impl FromToProto<pb::VariantObject> for dv::VariantObjectType {
fn from_pb(p: pb::VariantObject) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::VariantObject, Incompatible> {
let p = pb::VariantObject { ver: VER };
let p = pb::VariantObject {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}
Expand Down Expand Up @@ -467,7 +501,7 @@ impl FromToProto<pb::IntervalKind> for dv::IntervalKind {
impl FromToProto<pb::IntervalType> for dv::IntervalType {
fn from_pb(p: pb::IntervalType) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

let pb_kind: pb::IntervalKind =
FromPrimitive::from_i32(p.kind).ok_or_else(|| Incompatible {
Expand All @@ -482,6 +516,7 @@ impl FromToProto<pb::IntervalType> for dv::IntervalType {
let pb_kind = self.kind().to_pb()?;
let p = pb::IntervalType {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
kind: pb_kind as i32,
};
Ok(p)
Expand All @@ -491,13 +526,16 @@ impl FromToProto<pb::IntervalType> for dv::IntervalType {
impl FromToProto<pb::Variant> for dv::VariantType {
fn from_pb(p: pb::Variant) -> Result<Self, Incompatible>
where Self: Sized {
check_ver(p.ver)?;
check_ver(p.ver, p.min_compatible)?;

Ok(Self {})
}

fn to_pb(&self) -> Result<pb::Variant, Incompatible> {
let p = pb::Variant { ver: VER };
let p = pb::Variant {
ver: VER,
min_compatible: MIN_COMPATIBLE_VER,
};
Ok(p)
}
}
Expand Down
Loading

0 comments on commit 80e09f1

Please sign in to comment.