diff --git a/crates/arroyo-rpc/src/api_types/connections.rs b/crates/arroyo-rpc/src/api_types/connections.rs index 9c6750dbf..2dd05b671 100644 --- a/crates/arroyo-rpc/src/api_types/connections.rs +++ b/crates/arroyo-rpc/src/api_types/connections.rs @@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use crate::df::{ArroyoSchema, ArroyoSchemaRef}; +use arroyo_types::ArroyoExtensionType; use utoipa::{IntoParams, ToSchema}; #[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] @@ -124,33 +125,40 @@ pub struct SourceField { impl From for Field { fn from(f: SourceField) -> Self { - let t = match f.field_type.r#type { + let (t, ext) = match f.field_type.r#type { FieldType::Primitive(pt) => match pt { - PrimitiveType::Int32 => DataType::Int32, - PrimitiveType::Int64 => DataType::Int64, - PrimitiveType::UInt32 => DataType::UInt32, - PrimitiveType::UInt64 => DataType::UInt64, - PrimitiveType::F32 => DataType::Float32, - PrimitiveType::F64 => DataType::Float64, - PrimitiveType::Bool => DataType::Boolean, - PrimitiveType::String => DataType::Utf8, - PrimitiveType::Bytes => DataType::Binary, - PrimitiveType::UnixMillis => DataType::Timestamp(TimeUnit::Millisecond, None), - PrimitiveType::UnixMicros => DataType::Timestamp(TimeUnit::Microsecond, None), - PrimitiveType::UnixNanos => DataType::Timestamp(TimeUnit::Nanosecond, None), - PrimitiveType::DateTime => DataType::Timestamp(TimeUnit::Microsecond, None), - PrimitiveType::Json => DataType::Utf8, + PrimitiveType::Int32 => (DataType::Int32, None), + PrimitiveType::Int64 => (DataType::Int64, None), + PrimitiveType::UInt32 => (DataType::UInt32, None), + PrimitiveType::UInt64 => (DataType::UInt64, None), + PrimitiveType::F32 => (DataType::Float32, None), + PrimitiveType::F64 => (DataType::Float64, None), + PrimitiveType::Bool => (DataType::Boolean, None), + PrimitiveType::String => (DataType::Utf8, None), + PrimitiveType::Bytes => (DataType::Binary, None), + PrimitiveType::UnixMillis => { + (DataType::Timestamp(TimeUnit::Millisecond, None), None) + } + PrimitiveType::UnixMicros => { + (DataType::Timestamp(TimeUnit::Microsecond, None), None) + } + PrimitiveType::UnixNanos => (DataType::Timestamp(TimeUnit::Nanosecond, None), None), + PrimitiveType::DateTime => (DataType::Timestamp(TimeUnit::Microsecond, None), None), + PrimitiveType::Json => (DataType::Utf8, Some(ArroyoExtensionType::JSON)), }, - FieldType::Struct(s) => DataType::Struct(Fields::from( - s.fields - .into_iter() - .map(|t| t.into()) - .collect::>(), - )), - FieldType::List(t) => DataType::List(Arc::new((*t).into())), + FieldType::Struct(s) => ( + DataType::Struct(Fields::from( + s.fields + .into_iter() + .map(|t| t.into()) + .collect::>(), + )), + None, + ), + FieldType::List(t) => (DataType::List(Arc::new((*t).into())), None), }; - Field::new(f.field_name, t, f.nullable) + ArroyoExtensionType::add_metadata(ext, Field::new(f.field_name, t, f.nullable)) } } @@ -158,26 +166,31 @@ impl TryFrom for SourceField { type Error = String; fn try_from(f: Field) -> Result { - let field_type = match f.data_type() { - DataType::Boolean => FieldType::Primitive(PrimitiveType::Bool), - DataType::Int32 => FieldType::Primitive(PrimitiveType::Int32), - DataType::Int64 => FieldType::Primitive(PrimitiveType::Int64), - DataType::UInt32 => FieldType::Primitive(PrimitiveType::UInt32), - DataType::UInt64 => FieldType::Primitive(PrimitiveType::UInt64), - DataType::Float32 => FieldType::Primitive(PrimitiveType::F32), - DataType::Float64 => FieldType::Primitive(PrimitiveType::F64), - DataType::Binary | DataType::LargeBinary => FieldType::Primitive(PrimitiveType::Bytes), - DataType::Timestamp(TimeUnit::Millisecond, _) => { + let field_type = match (f.data_type(), ArroyoExtensionType::from_map(f.metadata())) { + (DataType::Boolean, None) => FieldType::Primitive(PrimitiveType::Bool), + (DataType::Int32, None) => FieldType::Primitive(PrimitiveType::Int32), + (DataType::Int64, None) => FieldType::Primitive(PrimitiveType::Int64), + (DataType::UInt32, None) => FieldType::Primitive(PrimitiveType::UInt32), + (DataType::UInt64, None) => FieldType::Primitive(PrimitiveType::UInt64), + (DataType::Float32, None) => FieldType::Primitive(PrimitiveType::F32), + (DataType::Float64, None) => FieldType::Primitive(PrimitiveType::F64), + (DataType::Binary, None) | (DataType::LargeBinary, None) => { + FieldType::Primitive(PrimitiveType::Bytes) + } + (DataType::Timestamp(TimeUnit::Millisecond, _), None) => { FieldType::Primitive(PrimitiveType::UnixMillis) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { + (DataType::Timestamp(TimeUnit::Microsecond, _), None) => { FieldType::Primitive(PrimitiveType::UnixMicros) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { + (DataType::Timestamp(TimeUnit::Nanosecond, _), None) => { FieldType::Primitive(PrimitiveType::UnixNanos) } - DataType::Utf8 => FieldType::Primitive(PrimitiveType::String), - DataType::Struct(fields) => { + (DataType::Utf8, None) => FieldType::Primitive(PrimitiveType::String), + (DataType::Utf8, Some(ArroyoExtensionType::JSON)) => { + FieldType::Primitive(PrimitiveType::Json) + } + (DataType::Struct(fields), None) => { let fields: Result<_, String> = fields .into_iter() .map(|f| (**f).clone().try_into()) @@ -190,7 +203,7 @@ impl TryFrom for SourceField { FieldType::Struct(st) } - DataType::List(item) => FieldType::List(Box::new((**item).clone().try_into()?)), + (DataType::List(item), None) => FieldType::List(Box::new((**item).clone().try_into()?)), dt => { return Err(format!("Unsupported data type {:?}", dt)); } diff --git a/crates/arroyo-rpc/src/lib.rs b/crates/arroyo-rpc/src/lib.rs index 65abbf1f7..126c5492b 100644 --- a/crates/arroyo-rpc/src/lib.rs +++ b/crates/arroyo-rpc/src/lib.rs @@ -156,7 +156,7 @@ pub fn primitive_to_sql(primitive_type: PrimitiveType) -> &'static str { | PrimitiveType::UnixMicros | PrimitiveType::UnixNanos | PrimitiveType::DateTime => "TIMESTAMP", - PrimitiveType::Json => "JSONB", + PrimitiveType::Json => "JSON", } } diff --git a/crates/arroyo-types/src/lib.rs b/crates/arroyo-types/src/lib.rs index 062c97fe7..d872e2f04 100644 --- a/crates/arroyo-types/src/lib.rs +++ b/crates/arroyo-types/src/lib.rs @@ -629,7 +629,10 @@ pub struct RawJson { } pub fn raw_schema() -> Schema { - Schema::new(vec![Field::new("value", DataType::Utf8, false)]) + Schema::new(vec![ArroyoExtensionType::add_metadata( + Some(ArroyoExtensionType::JSON), + Field::new("value", DataType::Utf8, false), + )]) } pub static MESSAGES_RECV: &str = "arroyo_worker_messages_recv";