Skip to content

Commit

Permalink
feat(common): Add support for DataType::Serial (risingwavelabs#8392)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 15, 2023
1 parent f907452 commit 61191c2
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 3 deletions.
6 changes: 6 additions & 0 deletions dashboard/proto/gen/data.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message DataType {
LIST = 16;
BYTEA = 17;
JSONB = 18;
SERIAL = 19;
}
TypeName type_name = 1;
// Data length for char.
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/hash/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use super::HashKey;
use crate::array::serial_array::Serial;
use crate::hash;
use crate::types::DataType;

Expand Down Expand Up @@ -90,6 +91,7 @@ fn hash_key_size(data_type: &DataType) -> HashKeySize {
DataType::Int16 => HashKeySize::Fixed(size_of::<i16>()),
DataType::Int32 => HashKeySize::Fixed(size_of::<i32>()),
DataType::Int64 => HashKeySize::Fixed(size_of::<i64>()),
DataType::Serial => HashKeySize::Fixed(size_of::<Serial>()),
DataType::Float32 => HashKeySize::Fixed(size_of::<OrderedF32>()),
DataType::Float64 => HashKeySize::Fixed(size_of::<OrderedF64>()),
DataType::Decimal => HashKeySize::Fixed(size_of::<Decimal>()),
Expand Down
21 changes: 21 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ pub enum DataType {
#[display("jsonb")]
#[from_str(regex = "(?i)^jsonb$")]
Jsonb,
#[display("serial")]
#[from_str(regex = "(?i)^serial$")]
Serial,
}

impl std::str::FromStr for Box<DataType> {
Expand All @@ -148,6 +151,7 @@ impl DataTypeName {
| DataTypeName::Int16
| DataTypeName::Int32
| DataTypeName::Int64
| DataTypeName::Serial
| DataTypeName::Decimal
| DataTypeName::Float32
| DataTypeName::Float64
Expand All @@ -170,6 +174,7 @@ impl DataTypeName {
DataTypeName::Int16 => DataType::Int16,
DataTypeName::Int32 => DataType::Int32,
DataTypeName::Int64 => DataType::Int64,
DataTypeName::Serial => DataType::Serial,
DataTypeName::Decimal => DataType::Decimal,
DataTypeName::Float32 => DataType::Float32,
DataTypeName::Float64 => DataType::Float64,
Expand Down Expand Up @@ -208,6 +213,7 @@ impl From<&ProstDataType> for DataType {
TypeName::Int16 => DataType::Int16,
TypeName::Int32 => DataType::Int32,
TypeName::Int64 => DataType::Int64,
TypeName::Serial => DataType::Serial,
TypeName::Float => DataType::Float32,
TypeName::Double => DataType::Float64,
TypeName::Boolean => DataType::Boolean,
Expand Down Expand Up @@ -242,6 +248,7 @@ impl DataType {
DataType::Int16 => PrimitiveArrayBuilder::<i16>::new(capacity).into(),
DataType::Int32 => PrimitiveArrayBuilder::<i32>::new(capacity).into(),
DataType::Int64 => PrimitiveArrayBuilder::<i64>::new(capacity).into(),
DataType::Serial => PrimitiveArrayBuilder::<Serial>::new(capacity).into(),
DataType::Float32 => PrimitiveArrayBuilder::<OrderedF32>::new(capacity).into(),
DataType::Float64 => PrimitiveArrayBuilder::<OrderedF64>::new(capacity).into(),
DataType::Decimal => DecimalArrayBuilder::new(capacity).into(),
Expand Down Expand Up @@ -271,6 +278,7 @@ impl DataType {
DataType::Int16 => TypeName::Int16,
DataType::Int32 => TypeName::Int32,
DataType::Int64 => TypeName::Int64,
DataType::Serial => TypeName::Serial,
DataType::Float32 => TypeName::Float,
DataType::Float64 => TypeName::Double,
DataType::Boolean => TypeName::Boolean,
Expand Down Expand Up @@ -313,6 +321,7 @@ impl DataType {
DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Serial
| DataType::Float32
| DataType::Float64
| DataType::Decimal
Expand Down Expand Up @@ -353,6 +362,7 @@ impl DataType {
DataType::Int16 => ScalarImpl::Int16(i16::MIN),
DataType::Int32 => ScalarImpl::Int32(i32::MIN),
DataType::Int64 => ScalarImpl::Int64(i64::MIN),
DataType::Serial => ScalarImpl::Serial(Serial::from(i64::MIN)),
DataType::Float32 => ScalarImpl::Float32(OrderedF32::neg_infinity()),
DataType::Float64 => ScalarImpl::Float64(OrderedF64::neg_infinity()),
DataType::Boolean => ScalarImpl::Bool(false),
Expand Down Expand Up @@ -780,6 +790,10 @@ impl ScalarImpl {
i64::from_sql(&Type::INT8, bytes)
.map_err(|err| ErrorCode::InvalidInputSyntax(err.to_string()))?,
),
DataType::Serial => Self::Serial(Serial::from(
i64::from_sql(&Type::INT8, bytes)
.map_err(|err| ErrorCode::InvalidInputSyntax(err.to_string()))?,
)),
DataType::Float32 => Self::Float32(
f32::from_sql(&Type::FLOAT4, bytes)
.map_err(|err| ErrorCode::InvalidInputSyntax(err.to_string()))?
Expand Down Expand Up @@ -862,6 +876,9 @@ impl ScalarImpl {
DataType::Int64 => Self::Int64(i64::from_str(str).map_err(|_| {
ErrorCode::InvalidInputSyntax(format!("Invalid param string: {}", str))
})?),
DataType::Serial => Self::Serial(Serial::from(i64::from_str(str).map_err(|_| {
ErrorCode::InvalidInputSyntax(format!("Invalid param string: {}", str))
})?)),
DataType::Float32 => Self::Float32(
f32::from_str(str)
.map_err(|_| {
Expand Down Expand Up @@ -1050,6 +1067,7 @@ impl ScalarImpl {
Ty::Int16 => Self::Int16(i16::deserialize(de)?),
Ty::Int32 => Self::Int32(i32::deserialize(de)?),
Ty::Int64 => Self::Int64(i64::deserialize(de)?),
Ty::Serial => Self::Serial(Serial::from(i64::deserialize(de)?)),
Ty::Float32 => Self::Float32(f32::deserialize(de)?.into()),
Ty::Float64 => Self::Float64(f64::deserialize(de)?.into()),
Ty::Varchar => Self::Utf8(Box::<str>::deserialize(de)?),
Expand Down Expand Up @@ -1100,6 +1118,7 @@ impl ScalarImpl {
DataType::Int16 => size_of::<i16>(),
DataType::Int32 => size_of::<i32>(),
DataType::Int64 => size_of::<i64>(),
DataType::Serial => size_of::<Serial>(),
DataType::Float32 => size_of::<OrderedF32>(),
DataType::Float64 => size_of::<OrderedF64>(),
DataType::Date => size_of::<NaiveDateWrapper>(),
Expand Down Expand Up @@ -1174,6 +1193,7 @@ macro_rules! for_all_type_pairs {
{ Interval, Interval },
{ Decimal, Decimal },
{ Jsonb, Jsonb },
{ Serial, Serial },
{ List, List },
{ Struct, Struct }
}
Expand Down Expand Up @@ -1360,6 +1380,7 @@ mod tests {
DataTypeName::Int16 => (ScalarImpl::Int16(233), DataType::Int16),
DataTypeName::Int32 => (ScalarImpl::Int32(233333), DataType::Int32),
DataTypeName::Int64 => (ScalarImpl::Int64(233333333333), DataType::Int64),
DataTypeName::Serial => (ScalarImpl::Serial(233333333333.into()), DataType::Serial),
DataTypeName::Float32 => (ScalarImpl::Float32(23.33.into()), DataType::Float32),
DataTypeName::Float64 => (
ScalarImpl::Float64(23.333333333333.into()),
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/types/postgres_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl DataType {
DataType::Int16 => 2,
DataType::Int32 | DataType::Float32 | DataType::Date => 4,
DataType::Int64
| DataType::Serial
| DataType::Float64
| DataType::Timestamp
| DataType::Timestamptz
Expand Down Expand Up @@ -114,6 +115,7 @@ impl DataType {
DataType::Int16 => 21,
DataType::Int32 => 23,
DataType::Int64 => 20,
DataType::Serial => 20,
DataType::Float32 => 700,
DataType::Float64 => 701,
DataType::Decimal => 1700,
Expand All @@ -133,6 +135,7 @@ impl DataType {
DataType::Int16 => 1005,
DataType::Int32 => 1007,
DataType::Int64 => 1016,
DataType::Serial => 1016,
DataType::Float32 => 1021,
DataType::Float64 => 1022,
DataType::Decimal => 1231,
Expand Down
4 changes: 3 additions & 1 deletion src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use either::{for_both, Either};
use enum_as_inner::EnumAsInner;
use itertools::Itertools;

use crate::array::{JsonbVal, ListRef, ListValue, StructRef, StructValue};
use crate::array::{serial_array, JsonbVal, ListRef, ListValue, StructRef, StructValue};
use crate::catalog::ColumnId;
use crate::row::{Row, RowDeserializer as BasicDeserializer};
use crate::types::struct_type::StructType;
Expand All @@ -35,6 +35,7 @@ use crate::types::{

pub mod error;
use error::ValueEncodingError;
use serial_array::Serial;

use self::column_aware_row_encoding::ColumnAwareSerde;
pub mod column_aware_row_encoding;
Expand Down Expand Up @@ -280,6 +281,7 @@ fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
DataType::Float32 => ScalarImpl::Float32(OrderedF32::from(data.get_f32_le())),
DataType::Float64 => ScalarImpl::Float64(OrderedF64::from(data.get_f64_le())),
DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ fn do_parse_simd_json_value(dtype: &DataType, v: &BorrowedValue<'_>) -> Result<S
DataType::Int16 => ensure_i16!(v, i16).into(),
DataType::Int32 => ensure_i32!(v, i32).into(),
DataType::Int64 => ensure_i64!(v, i64).into(),
DataType::Serial => anyhow::bail!("serial should not be parsed"),
// when f32 overflows, the value is converted to `inf` which is inappropriate
DataType::Float32 => {
let scalar_val = ScalarImpl::Float32((simd_json_ensure_float!(v, f32) as f32).into());
Expand Down
2 changes: 2 additions & 0 deletions src/expr/src/expr/expr_binary_nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::sync::Arc;

use risingwave_common::array::serial_array::SerialArray;
use risingwave_common::array::*;
use risingwave_common::buffer::Bitmap;
use risingwave_common::row::OwnedRow;
Expand Down Expand Up @@ -201,6 +202,7 @@ fn build_array_access_expr(
DataType::Int16 => array_access_expression!(I16Array),
DataType::Int32 => array_access_expression!(I32Array),
DataType::Int64 => array_access_expression!(I64Array),
DataType::Serial => array_access_expression!(SerialArray),
DataType::Float32 => array_access_expression!(F32Array),
DataType::Float64 => array_access_expression!(F64Array),
DataType::Decimal => array_access_expression!(DecimalArray),
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/vector_op/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ pub fn literal_parsing(
DataType::Int16 => str_parse::<i16>(s)?.into(),
DataType::Int32 => str_parse::<i32>(s)?.into(),
DataType::Int64 => str_parse::<i64>(s)?.into(),
DataType::Serial => return Err(None),
DataType::Decimal => str_parse::<Decimal>(s)?.into(),
DataType::Float32 => str_parse::<OrderedF32>(s)?.into(),
DataType::Float64 => str_parse::<OrderedF64>(s)?.into(),
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,13 @@ pub fn bind_data_type(data_type: &AstDataType) -> Result<DataType> {
"float8" => DataType::Float64,
"timestamptz" => DataType::Timestamptz,
"jsonb" => DataType::Jsonb,
"serial" => {
return Err(ErrorCode::NotSupported(
"Column type SERIAL is not supported".into(),
"Please remove the SERIAL column".into(),
)
.into())
}
_ => return Err(new_err().into()),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl std::fmt::Debug for Literal {
DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Serial
| DataType::Decimal
| DataType::Float32
| DataType::Float64 => write!(f, "{}", v.as_scalar_ref_impl().to_text()),
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/index_selection_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::rc::Rc;

use itertools::Itertools;
use risingwave_common::array::serial_array::Serial;
use risingwave_common::catalog::Schema;
use risingwave_common::types::{
DataType, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper,
Expand Down Expand Up @@ -704,6 +705,7 @@ impl<'a> TableScanIoEstimator<'a> {
DataType::Int16 => size_of::<i16>(),
DataType::Int32 => size_of::<i32>(),
DataType::Int64 => size_of::<i64>(),
DataType::Serial => size_of::<Serial>(),
DataType::Float32 => size_of::<f32>(),
DataType::Float64 => size_of::<f64>(),
DataType::Decimal => size_of::<Decimal>(),
Expand Down
1 change: 1 addition & 0 deletions src/tests/sqlsmith/src/sql_gen/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub(super) fn data_type_to_ast_data_type(data_type: &DataType) -> AstDataType {
DataType::Int16 => AstDataType::SmallInt,
DataType::Int32 => AstDataType::Int,
DataType::Int64 => AstDataType::BigInt,
DataType::Serial => unreachable!("serial should not be generated"),
DataType::Decimal => AstDataType::Decimal(None, None),
DataType::Float32 => AstDataType::Real,
DataType::Float64 => AstDataType::Double,
Expand Down
4 changes: 2 additions & 2 deletions src/utils/pgwire/src/pg_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl PreparedStatement {
};
format!("'{}'::JSONB", tmp)
}
DataType::Struct(_) | DataType::List { .. } => {
DataType::Serial | DataType::Struct(_) | DataType::List { .. } => {
return Err(PsqlError::Internal(anyhow!(
"Unsupported param type {:?}",
type_oid
Expand Down Expand Up @@ -557,7 +557,7 @@ impl PreparedStatement {
}
DataType::Interval => params.push("'2 months ago'::interval".to_string()),
DataType::Jsonb => params.push("'null'::JSONB".to_string()),
DataType::Struct(_) | DataType::List { .. } => {
DataType::Serial | DataType::Struct(_) | DataType::List { .. } => {
return Err(PsqlError::Internal(anyhow!(
"Unsupported param type {:?}",
oid
Expand Down

0 comments on commit 61191c2

Please sign in to comment.