Skip to content

Commit

Permalink
Revert "fix: Fixes datetime functions support (#34)"
Browse files Browse the repository at this point in the history
This reverts commit 4ffcbdc.
  • Loading branch information
philippemnoel authored Aug 12, 2024
1 parent 4ffcbdc commit 4796d50
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 106 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ serde = "1.0.201"
serde_json = "1.0.120"
signal-hook = "0.3.17"
signal-hook-async-std = "0.2.2"
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4a07043" }
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652" }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "6c58451" }
thiserror = "1.0.59"
uuid = "1.9.1"
Expand All @@ -42,7 +42,7 @@ futures = "0.3.30"
pgrx-tests = "0.11.3"
rstest = "0.19.0"
serde_arrow = { version = "0.11.3", features = ["arrow-51"] }
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4a07043", features = ["fixtures"] }
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652", features = ["fixtures"] }
sqlx = { version = "0.7.4", features = [
"postgres",
"runtime-async-std",
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub extern "C" fn _PG_init() {

GUCS.init("pg_analytics");

setup_telemetry_background_worker(ParadeExtension::PgAnalytics);
// TODO: Change to ParadeExtension::PgAnalytics
setup_telemetry_background_worker(ParadeExtension::PgLakehouse);
}

#[cfg(test)]
Expand Down
20 changes: 0 additions & 20 deletions src/schema/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,14 +1088,6 @@ where
None => Ok(None),
}
}
DataType::Date32 => match self.get_date_value::<i32, Date32Type>(index)? {
Some(value) => Ok(Some(Cell::Date(value))),
None => Ok(None),
},
DataType::Date64 => match self.get_date_value::<i64, Date64Type>(index)? {
Some(value) => Ok(Some(Cell::Date(value))),
None => Ok(None),
},
unsupported => Err(DataTypeError::DataTypeMismatch(
name.to_string(),
unsupported.clone(),
Expand Down Expand Up @@ -1134,18 +1126,6 @@ where
None => Ok(None),
}
}
DataType::Date32 => {
match self.get_timestamptz_value::<TimestampSecondType>(index, None)? {
Some(value) => Ok(Some(Cell::Timestamptz(value))),
None => Ok(None),
}
}
DataType::Date64 => {
match self.get_timestamptz_value::<TimestampSecondType>(index, None)? {
Some(value) => Ok(Some(Cell::Timestamptz(value))),
None => Ok(None),
}
}
unsupported => Err(DataTypeError::DataTypeMismatch(
name.to_string(),
unsupported.clone(),
Expand Down
84 changes: 1 addition & 83 deletions tests/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
mod fixtures;

use std::fs::File;
use std::sync::Arc;

use anyhow::Result;
use chrono::{DateTime, Datelike, TimeZone, Utc};
use datafusion::arrow::array::*;
use datafusion::arrow::datatypes::DataType;
use datafusion::parquet::arrow::ArrowWriter;
use deltalake::operations::create::CreateBuilder;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
Expand All @@ -32,56 +28,19 @@ use rstest::*;
use shared::fixtures::arrow::{
delta_primitive_record_batch, primitive_record_batch, primitive_setup_fdw_local_file_delta,
primitive_setup_fdw_local_file_listing, primitive_setup_fdw_s3_delta,
primitive_setup_fdw_s3_listing, setup_fdw_local_parquet_file_listing, FieldSpec,
primitive_setup_fdw_s3_listing,
};
use shared::fixtures::tempfile::TempDir;
use sqlx::postgres::types::PgInterval;
use sqlx::types::{BigDecimal, Json, Uuid};
use sqlx::PgConnection;
use std::collections::HashMap;
use std::str::FromStr;
use temporal_conversions::SECONDS_IN_DAY;
use time::macros::{date, datetime, time};

const S3_TRIPS_BUCKET: &str = "test-trip-setup";
const S3_TRIPS_KEY: &str = "test_trip_setup.parquet";

fn date_time_record_batch() -> Result<(RecordBatch, FieldSpec, Vec<String>)> {
let field_spec = FieldSpec::from(vec![
("date32_col", DataType::Date32, false, "date"),
("date64_col", DataType::Date64, false, "date"),
]);
let dates = vec![
"2023-04-01 21:10:00 +0000".to_string(),
"2023-04-01 22:08:00 +0000".to_string(),
"2023-04-02 04:55:00 +0000".to_string(),
"2023-04-02 11:45:00 +0000".to_string(),
"2023-04-03 01:20:00 +0000".to_string(),
"2023-04-03 12:30:00 +0000".to_string(),
];
let (dates_i32, dates_i64): (Vec<_>, Vec<_>) = dates
.iter()
.map(|date_str| {
let dt = date_str.parse::<DateTime<Utc>>().unwrap();
(
(dt.timestamp() / SECONDS_IN_DAY) as i32,
dt.timestamp_millis(),
)
})
.unzip();

let schema = Arc::new(field_spec.arrow_schema());
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Date32Array::from(dates_i32)),
Arc::new(Date64Array::from(dates_i64)),
],
)?;

Ok((batch, field_spec, dates))
}

#[rstest]
async fn test_trip_count(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> {
NycTripsTable::setup().execute(&mut conn);
Expand Down Expand Up @@ -327,44 +286,3 @@ async fn test_create_heap_from_parquet(mut conn: PgConnection, tempdir: TempDir)

Ok(())
}

#[rstest]
async fn test_date_functions_support_with_local_file(
mut conn: PgConnection,
tempdir: TempDir,
) -> Result<()> {
let (stored_batch, field_spec, dates) = date_time_record_batch()?;
let parquet_path = tempdir.path().join("test_date_functions.parquet");
let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

setup_fdw_local_parquet_file_listing(
parquet_path.as_path().to_str().unwrap(),
"dates",
&field_spec.postgres_schema(),
)
.execute(&mut conn);

let expected_rows: Vec<(f64, DateTime<Utc>)> = dates
.iter()
.map(|date_str| {
let dt = date_str.parse::<DateTime<Utc>>().unwrap();
(
dt.day() as f64,
Utc.with_ymd_and_hms(dt.year(), dt.month(), dt.day(), 0, 0, 0)
.unwrap(),
)
})
.collect();

let fetched_rows =
"SELECT DATE_PART('day', date32_col), DATE_TRUNC('day', date64_col) FROM dates"
.fetch_result::<(f64, chrono::DateTime<Utc>)>(&mut conn)?;
assert_eq!(expected_rows.len(), fetched_rows.len());
assert_eq!(expected_rows, fetched_rows);

Ok(())
}

0 comments on commit 4796d50

Please sign in to comment.