Skip to content

Commit

Permalink
Fix regression in raw_json schema conversion (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed May 8, 2024
1 parent f54a6b2 commit 052ca58
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 40 deletions.
89 changes: 51 additions & 38 deletions crates/arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter};
use std::sync::Arc;

use crate::df::{ArroyoSchema, ArroyoSchemaRef};
use arroyo_types::ArroyoExtensionType;
use utoipa::{IntoParams, ToSchema};

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
Expand Down Expand Up @@ -124,60 +125,72 @@ pub struct SourceField {

impl From<SourceField> for Field {
fn from(f: SourceField) -> Self {
let t = match f.field_type.r#type {
let (t, ext) = match f.field_type.r#type {
FieldType::Primitive(pt) => match pt {
PrimitiveType::Int32 => DataType::Int32,
PrimitiveType::Int64 => DataType::Int64,
PrimitiveType::UInt32 => DataType::UInt32,
PrimitiveType::UInt64 => DataType::UInt64,
PrimitiveType::F32 => DataType::Float32,
PrimitiveType::F64 => DataType::Float64,
PrimitiveType::Bool => DataType::Boolean,
PrimitiveType::String => DataType::Utf8,
PrimitiveType::Bytes => DataType::Binary,
PrimitiveType::UnixMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
PrimitiveType::UnixMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
PrimitiveType::UnixNanos => DataType::Timestamp(TimeUnit::Nanosecond, None),
PrimitiveType::DateTime => DataType::Timestamp(TimeUnit::Microsecond, None),
PrimitiveType::Json => DataType::Utf8,
PrimitiveType::Int32 => (DataType::Int32, None),
PrimitiveType::Int64 => (DataType::Int64, None),
PrimitiveType::UInt32 => (DataType::UInt32, None),
PrimitiveType::UInt64 => (DataType::UInt64, None),
PrimitiveType::F32 => (DataType::Float32, None),
PrimitiveType::F64 => (DataType::Float64, None),
PrimitiveType::Bool => (DataType::Boolean, None),
PrimitiveType::String => (DataType::Utf8, None),
PrimitiveType::Bytes => (DataType::Binary, None),
PrimitiveType::UnixMillis => {
(DataType::Timestamp(TimeUnit::Millisecond, None), None)
}
PrimitiveType::UnixMicros => {
(DataType::Timestamp(TimeUnit::Microsecond, None), None)
}
PrimitiveType::UnixNanos => (DataType::Timestamp(TimeUnit::Nanosecond, None), None),
PrimitiveType::DateTime => (DataType::Timestamp(TimeUnit::Microsecond, None), None),
PrimitiveType::Json => (DataType::Utf8, Some(ArroyoExtensionType::JSON)),
},
FieldType::Struct(s) => DataType::Struct(Fields::from(
s.fields
.into_iter()
.map(|t| t.into())
.collect::<Vec<Field>>(),
)),
FieldType::List(t) => DataType::List(Arc::new((*t).into())),
FieldType::Struct(s) => (
DataType::Struct(Fields::from(
s.fields
.into_iter()
.map(|t| t.into())
.collect::<Vec<Field>>(),
)),
None,
),
FieldType::List(t) => (DataType::List(Arc::new((*t).into())), None),
};

Field::new(f.field_name, t, f.nullable)
ArroyoExtensionType::add_metadata(ext, Field::new(f.field_name, t, f.nullable))
}
}

impl TryFrom<Field> for SourceField {
type Error = String;

fn try_from(f: Field) -> Result<Self, Self::Error> {
let field_type = match f.data_type() {
DataType::Boolean => FieldType::Primitive(PrimitiveType::Bool),
DataType::Int32 => FieldType::Primitive(PrimitiveType::Int32),
DataType::Int64 => FieldType::Primitive(PrimitiveType::Int64),
DataType::UInt32 => FieldType::Primitive(PrimitiveType::UInt32),
DataType::UInt64 => FieldType::Primitive(PrimitiveType::UInt64),
DataType::Float32 => FieldType::Primitive(PrimitiveType::F32),
DataType::Float64 => FieldType::Primitive(PrimitiveType::F64),
DataType::Binary | DataType::LargeBinary => FieldType::Primitive(PrimitiveType::Bytes),
DataType::Timestamp(TimeUnit::Millisecond, _) => {
let field_type = match (f.data_type(), ArroyoExtensionType::from_map(f.metadata())) {
(DataType::Boolean, None) => FieldType::Primitive(PrimitiveType::Bool),
(DataType::Int32, None) => FieldType::Primitive(PrimitiveType::Int32),
(DataType::Int64, None) => FieldType::Primitive(PrimitiveType::Int64),
(DataType::UInt32, None) => FieldType::Primitive(PrimitiveType::UInt32),
(DataType::UInt64, None) => FieldType::Primitive(PrimitiveType::UInt64),
(DataType::Float32, None) => FieldType::Primitive(PrimitiveType::F32),
(DataType::Float64, None) => FieldType::Primitive(PrimitiveType::F64),
(DataType::Binary, None) | (DataType::LargeBinary, None) => {
FieldType::Primitive(PrimitiveType::Bytes)
}
(DataType::Timestamp(TimeUnit::Millisecond, _), None) => {
FieldType::Primitive(PrimitiveType::UnixMillis)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
(DataType::Timestamp(TimeUnit::Microsecond, _), None) => {
FieldType::Primitive(PrimitiveType::UnixMicros)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
(DataType::Timestamp(TimeUnit::Nanosecond, _), None) => {
FieldType::Primitive(PrimitiveType::UnixNanos)
}
DataType::Utf8 => FieldType::Primitive(PrimitiveType::String),
DataType::Struct(fields) => {
(DataType::Utf8, None) => FieldType::Primitive(PrimitiveType::String),
(DataType::Utf8, Some(ArroyoExtensionType::JSON)) => {
FieldType::Primitive(PrimitiveType::Json)
}
(DataType::Struct(fields), None) => {
let fields: Result<_, String> = fields
.into_iter()
.map(|f| (**f).clone().try_into())
Expand All @@ -190,7 +203,7 @@ impl TryFrom<Field> for SourceField {

FieldType::Struct(st)
}
DataType::List(item) => FieldType::List(Box::new((**item).clone().try_into()?)),
(DataType::List(item), None) => FieldType::List(Box::new((**item).clone().try_into()?)),
dt => {
return Err(format!("Unsupported data type {:?}", dt));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub fn primitive_to_sql(primitive_type: PrimitiveType) -> &'static str {
| PrimitiveType::UnixMicros
| PrimitiveType::UnixNanos
| PrimitiveType::DateTime => "TIMESTAMP",
PrimitiveType::Json => "JSONB",
PrimitiveType::Json => "JSON",
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,10 @@ pub struct RawJson {
}

pub fn raw_schema() -> Schema {
Schema::new(vec![Field::new("value", DataType::Utf8, false)])
Schema::new(vec![ArroyoExtensionType::add_metadata(
Some(ArroyoExtensionType::JSON),
Field::new("value", DataType::Utf8, false),
)])
}

pub static MESSAGES_RECV: &str = "arroyo_worker_messages_recv";
Expand Down

0 comments on commit 052ca58

Please sign in to comment.