diff --git a/docs/bigquery.md b/docs/bigquery.md index 9d15df35..56d0c5d5 100644 --- a/docs/bigquery.md +++ b/docs/bigquery.md @@ -15,6 +15,7 @@ The BigQuery Wrapper allows you to read and write data from BigQuery within your | date | DATE | | timestamp | DATETIME | | timestamp | TIMESTAMP | +| timestamptz | TIMESTAMP | ## Preparation diff --git a/docs/mssql.md b/docs/mssql.md index e9dea105..c07094d9 100644 --- a/docs/mssql.md +++ b/docs/mssql.md @@ -17,6 +17,7 @@ The SQL Server Wrapper allows you to read data from Microsoft SQL Server within | text | varchar/char/text | | date | date | | timestamp | datetime/datetime2/smalldatetime | +| timestamptz | datetime/datetime2/smalldatetime | ## Preparation diff --git a/docs/s3.md b/docs/s3.md index efb69773..e19cb837 100644 --- a/docs/s3.md +++ b/docs/s3.md @@ -34,6 +34,7 @@ The S3 Wrapper uses Parquet file data types from [arrow_array::types](https://do | text | ByteArrayType | | date | Date64Type | | timestamp | TimestampNanosecondType | +| timestamptz | TimestampNanosecondType | ## Preparation diff --git a/supabase-wrappers/src/interface.rs b/supabase-wrappers/src/interface.rs index 5969dbad..e545cb53 100644 --- a/supabase-wrappers/src/interface.rs +++ b/supabase-wrappers/src/interface.rs @@ -3,7 +3,7 @@ use crate::FdwRoutine; use pgrx::pg_sys::panic::ErrorReport; -use pgrx::prelude::{Date, Timestamp}; +use pgrx::prelude::{Date, Timestamp, TimestampWithTimeZone}; use pgrx::{ fcinfo, pg_sys::{self, BuiltinOid, Datum, Oid}, @@ -44,6 +44,7 @@ pub enum Cell { String(String), Date(Date), Timestamp(Timestamp), + Timestamptz(TimestampWithTimeZone), Json(JsonB), } @@ -61,6 +62,7 @@ impl Clone for Cell { Cell::String(v) => Cell::String(v.clone()), Cell::Date(v) => Cell::Date(*v), Cell::Timestamp(v) => Cell::Timestamp(*v), + Cell::Timestamptz(v) => Cell::Timestamptz(*v), Cell::Json(v) => Cell::Json(JsonB(v.0.clone())), } } @@ -81,18 +83,43 @@ impl fmt::Display for Cell { Cell::Date(v) => unsafe { let dt = fcinfo::direct_function_call_as_datum(pg_sys::date_out, &[(*v).into_datum()]) - .unwrap(); + .expect("cell should be a valid date"); let dt_cstr = CStr::from_ptr(dt.cast_mut_ptr()); - write!(f, "'{}'", dt_cstr.to_str().unwrap()) + write!( + f, + "'{}'", + dt_cstr.to_str().expect("date should be a valid string") + ) }, Cell::Timestamp(v) => unsafe { let ts = fcinfo::direct_function_call_as_datum( pg_sys::timestamp_out, &[(*v).into_datum()], ) - .unwrap(); + .expect("cell should be a valid timestamp"); + let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr()); + write!( + f, + "'{}'", + ts_cstr + .to_str() + .expect("timestamp should be a valid string") + ) + }, + Cell::Timestamptz(v) => unsafe { + let ts = fcinfo::direct_function_call_as_datum( + pg_sys::timestamptz_out, + &[(*v).into_datum()], + ) + .expect("cell should be a valid timestamptz"); let ts_cstr = CStr::from_ptr(ts.cast_mut_ptr()); - write!(f, "'{}'", ts_cstr.to_str().unwrap()) + write!( + f, + "'{}'", + ts_cstr + .to_str() + .expect("timestamptz should be a valid string") + ) }, Cell::Json(v) => write!(f, "{:?}", v), } @@ -113,6 +140,7 @@ impl IntoDatum for Cell { Cell::String(v) => v.into_datum(), Cell::Date(v) => v.into_datum(), Cell::Timestamp(v) => v.into_datum(), + Cell::Timestamptz(v) => v.into_datum(), Cell::Json(v) => v.into_datum(), } } @@ -134,6 +162,7 @@ impl IntoDatum for Cell { || other == pg_sys::TEXTOID || other == pg_sys::DATEOID || other == pg_sys::TIMESTAMPOID + || other == pg_sys::TIMESTAMPTZOID || other == pg_sys::JSONBOID } } @@ -143,46 +172,44 @@ impl FromDatum for Cell { where Self: Sized, { - if is_null { - return None; - } let oid = PgOid::from(typoid); match oid { PgOid::BuiltIn(PgBuiltInOids::BOOLOID) => { - Some(Cell::Bool(bool::from_datum(datum, false).unwrap())) - } - PgOid::BuiltIn(PgBuiltInOids::CHAROID) => { - Some(Cell::I8(i8::from_datum(datum, false).unwrap())) + bool::from_datum(datum, is_null).map(Cell::Bool) } + PgOid::BuiltIn(PgBuiltInOids::CHAROID) => i8::from_datum(datum, is_null).map(Cell::I8), PgOid::BuiltIn(PgBuiltInOids::INT2OID) => { - Some(Cell::I16(i16::from_datum(datum, false).unwrap())) + i16::from_datum(datum, is_null).map(Cell::I16) } PgOid::BuiltIn(PgBuiltInOids::FLOAT4OID) => { - Some(Cell::F32(f32::from_datum(datum, false).unwrap())) + f32::from_datum(datum, is_null).map(Cell::F32) } PgOid::BuiltIn(PgBuiltInOids::INT4OID) => { - Some(Cell::I32(i32::from_datum(datum, false).unwrap())) + i32::from_datum(datum, is_null).map(Cell::I32) } PgOid::BuiltIn(PgBuiltInOids::FLOAT8OID) => { - Some(Cell::F64(f64::from_datum(datum, false).unwrap())) + f64::from_datum(datum, is_null).map(Cell::F64) } PgOid::BuiltIn(PgBuiltInOids::INT8OID) => { - Some(Cell::I64(i64::from_datum(datum, false).unwrap())) + i64::from_datum(datum, is_null).map(Cell::I64) } PgOid::BuiltIn(PgBuiltInOids::NUMERICOID) => { - Some(Cell::Numeric(AnyNumeric::from_datum(datum, false).unwrap())) + AnyNumeric::from_datum(datum, is_null).map(Cell::Numeric) } PgOid::BuiltIn(PgBuiltInOids::TEXTOID) => { - Some(Cell::String(String::from_datum(datum, false).unwrap())) + String::from_datum(datum, is_null).map(Cell::String) } PgOid::BuiltIn(PgBuiltInOids::DATEOID) => { - Some(Cell::Date(Date::from_datum(datum, false).unwrap())) + Date::from_datum(datum, is_null).map(Cell::Date) + } + PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPOID) => { + Timestamp::from_datum(datum, is_null).map(Cell::Timestamp) + } + PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPTZOID) => { + TimestampWithTimeZone::from_datum(datum, is_null).map(Cell::Timestamptz) } - PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPOID) => Some(Cell::Timestamp( - Timestamp::from_datum(datum, false).unwrap(), - )), PgOid::BuiltIn(PgBuiltInOids::JSONBOID) => { - Some(Cell::Json(JsonB::from_datum(datum, false).unwrap())) + JsonB::from_datum(datum, is_null).map(Cell::Json) } _ => None, } @@ -226,9 +253,9 @@ impl Row { { let keep: Vec = self.iter().map(f).collect(); let mut iter = keep.iter(); - self.cols.retain(|_| *iter.next().unwrap()); + self.cols.retain(|_| *iter.next().unwrap_or(&true)); iter = keep.iter(); - self.cells.retain(|_| *iter.next().unwrap()); + self.cells.retain(|_| *iter.next().unwrap_or(&true)); } /// Replace `self` with the source row diff --git a/wrappers/src/fdw/airtable_fdw/result.rs b/wrappers/src/fdw/airtable_fdw/result.rs index 15364aa2..fb6c527d 100644 --- a/wrappers/src/fdw/airtable_fdw/result.rs +++ b/wrappers/src/fdw/airtable_fdw/result.rs @@ -213,6 +213,19 @@ impl AirtableRecord { } }, ), + pg_sys::TIMESTAMPTZOID => self.fields.0.get(&col.name).map_or_else( + || Ok(None), + |val| { + if let Value::String(v) = val { + let n = pgrx::TimestampWithTimeZone::from_str(v.as_str()) + .ok() + .map(Cell::Timestamptz); + Ok(n) + } else { + Err(()) + } + }, + ), // TODO: Think about adding support for BOOLARRAYOID, NUMERICARRAYOID, TEXTARRAYOID and rest of array types. pg_sys::JSONBOID => self.fields.0.get(&col.name).map_or_else( || Ok(None), diff --git a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs index 5478f2d0..f772745f 100644 --- a/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs +++ b/wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs @@ -393,6 +393,7 @@ impl ForeignDataWrapper for BigQueryFdw { Cell::String(v) => row_json[col_name] = json!(v), Cell::Date(v) => row_json[col_name] = json!(v), Cell::Timestamp(v) => row_json[col_name] = json!(v), + Cell::Timestamptz(v) => row_json[col_name] = json!(v), Cell::Json(v) => row_json[col_name] = json!(v), } } diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index 43fabd87..7550764b 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -1,7 +1,7 @@ use crate::stats; use pgrx::{ pg_sys, - prelude::{AnyNumeric, Date, Timestamp}, + prelude::{AnyNumeric, Date, Timestamp, TimestampWithTimeZone}, }; use reqwest::{ self, @@ -75,6 +75,10 @@ fn json_value_to_cell(tgt_col: &Column, v: &JsonValue) -> LogflareFdwResult v + .as_str() + .and_then(|s| TimestampWithTimeZone::from_str(s).ok()) + .map(Cell::Timestamptz), _ => { return Err(LogflareFdwError::UnsupportedColumnType( tgt_col.name.clone(), @@ -118,6 +122,9 @@ impl LogflareFdw { Cell::String(s) => s.clone(), Cell::Date(d) => d.to_string().as_str().trim_matches('\'').to_owned(), Cell::Timestamp(t) => t.to_string().as_str().trim_matches('\'').to_owned(), + Cell::Timestamptz(t) => { + t.to_string().as_str().trim_matches('\'').to_owned() + } _ => cell.to_string(), }; url.query_pairs_mut().append_pair(param_name, &value); diff --git a/wrappers/src/fdw/mssql_fdw/mssql_fdw.rs b/wrappers/src/fdw/mssql_fdw/mssql_fdw.rs index 956871a6..e3afe7ba 100644 --- a/wrappers/src/fdw/mssql_fdw/mssql_fdw.rs +++ b/wrappers/src/fdw/mssql_fdw/mssql_fdw.rs @@ -66,6 +66,12 @@ fn field_to_cell(src_row: &tiberius::Row, tgt_col: &Column) -> MssqlFdwResult { + src_row.try_get::(col_name)?.map(|v| { + let ts = to_timestamp(v.timestamp() as f64); + Cell::Timestamptz(ts) + }) + } _ => { return Err(MssqlFdwError::UnsupportedColumnType(tgt_col.name.clone())); } diff --git a/wrappers/src/fdw/s3_fdw/parquet.rs b/wrappers/src/fdw/s3_fdw/parquet.rs index 3ca7289b..bac4f597 100644 --- a/wrappers/src/fdw/s3_fdw/parquet.rs +++ b/wrappers/src/fdw/s3_fdw/parquet.rs @@ -339,6 +339,20 @@ impl S3Parquet { }) } } + pg_sys::TIMESTAMPTZOID => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or(S3FdwError::ColumnTypeNotMatch(tgt_col.name.clone()))?; + if arr.is_null(self.batch_idx) { + None + } else { + arr.value_as_datetime(self.batch_idx).map(|ts| { + let ts = to_timestamp(ts.timestamp() as f64); + Cell::Timestamptz(ts) + }) + } + } _ => return Err(S3FdwError::UnsupportedColumnType(tgt_col.name.clone())), }; row.push(&tgt_col.name, cell);