Skip to content

Commit

Permalink
fix & remove unecessary conversion based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
evanxg852000 committed Aug 8, 2024
1 parent 06abfb2 commit cbf395d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 73 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ serde = "1.0.201"
serde_json = "1.0.120"
signal-hook = "0.3.17"
signal-hook-async-std = "0.2.2"
shared = { git = "https://github.com/paradedb/paradedb.git", branch = "add-util-record-batches" }
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4a07043" }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "6c58451" }
thiserror = "1.0.59"
uuid = "1.9.1"
Expand All @@ -42,7 +42,7 @@ futures = "0.3.30"
pgrx-tests = "0.11.3"
rstest = "0.19.0"
serde_arrow = { version = "0.11.3", features = ["arrow-51"] }
shared = { git = "https://github.com/paradedb/paradedb.git", branch = "add-util-record-batches", features = ["fixtures"] }
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4a07043", features = ["fixtures"] }
sqlx = { version = "0.7.4", features = [
"postgres",
"runtime-async-std",
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub extern "C" fn _PG_init() {
GUCS.init("pg_analytics");

// TODO: Change to ParadeExtension::PgAnalytics
setup_telemetry_background_worker(ParadeExtension::PgLakehouse);
setup_telemetry_background_worker(ParadeExtension::PgAnalytics);
}

#[cfg(test)]
Expand Down
45 changes: 17 additions & 28 deletions src/schema/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ use duckdb::arrow::array::types::{
};
use duckdb::arrow::array::{
timezone::Tz, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, AsArray, BinaryArray,
BooleanArray, Date32Array, Date64Array, Decimal128Array, Float16Array, Float32Array,
Float64Array, GenericByteArray, Int16Array, Int32Array, Int64Array, Int8Array,
LargeBinaryArray, StringArray,
BooleanArray, Decimal128Array, Float16Array, Float32Array, Float64Array, GenericByteArray,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, StringArray,
};
use duckdb::arrow::datatypes::{DataType, DecimalType, GenericStringType, IntervalUnit, TimeUnit};
use pgrx::*;
Expand Down Expand Up @@ -1089,20 +1088,12 @@ where
None => Ok(None),
}
}
DataType::Date32 => match self.get_primitive_value::<Date32Array>(index)? {
Some(timestamp_in_days) => {
Ok(arrow_date32_to_postgres_timestamps(timestamp_in_days)?
.map(Timestamp::from)
.map(Cell::Timestamp))
}
DataType::Date32 => match self.get_date_value::<i32, Date32Type>(index)? {
Some(value) => Ok(Some(Cell::Date(value))),
None => Ok(None),
},
DataType::Date64 => match self.get_primitive_value::<Date64Array>(index)? {
Some(timestamp_in_milliseconds) => Ok(arrow_date64_to_postgres_timestamps(
timestamp_in_milliseconds,
)?
.map(Timestamp::from)
.map(Cell::Timestamp)),
DataType::Date64 => match self.get_date_value::<i64, Date64Type>(index)? {
Some(value) => Ok(Some(Cell::Date(value))),
None => Ok(None),
},
unsupported => Err(DataTypeError::DataTypeMismatch(
Expand Down Expand Up @@ -1143,20 +1134,18 @@ where
None => Ok(None),
}
}
DataType::Date32 => match self.get_primitive_value::<Date32Array>(index)? {
Some(timestamp_in_days) => {
Ok(arrow_date32_to_postgres_timestamps(timestamp_in_days)?
.map(Cell::Timestamptz))
DataType::Date32 => {
match self.get_timestamptz_value::<TimestampSecondType>(index, None)? {
Some(value) => Ok(Some(Cell::Timestamptz(value))),
None => Ok(None),
}
None => Ok(None),
},
DataType::Date64 => match self.get_primitive_value::<Date64Array>(index)? {
Some(timestamp_in_milliseconds) => Ok(arrow_date64_to_postgres_timestamps(
timestamp_in_milliseconds,
)?
.map(Cell::Timestamptz)),
None => Ok(None),
},
}
DataType::Date64 => {
match self.get_timestamptz_value::<TimestampSecondType>(index, None)? {
Some(value) => Ok(Some(Cell::Timestamptz(value))),
None => Ok(None),
}
}
unsupported => Err(DataTypeError::DataTypeMismatch(
name.to_string(),
unsupported.clone(),
Expand Down
43 changes: 1 addition & 42 deletions src/schema/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use chrono::{
DateTime, Datelike, Days, NaiveDate, NaiveDateTime, NaiveTime, TimeDelta, TimeZone, Timelike,
DateTime, Datelike, NaiveDate, NaiveDateTime, NaiveTime, TimeDelta, TimeZone, Timelike,
};
use pgrx::*;
use std::fmt::Debug;
Expand All @@ -25,47 +25,6 @@ use std::str::FromStr;

const NANOSECONDS_IN_SECOND: u32 = 1_000_000_000;

const MILLISECONDS_IN_SECOND: i64 = 1_000;

const SECONDS_IN_DAY: i64 = 86_400;

// Number of days between Apache Arrow / UNIX epoch (1970-01-01)
// and PostgreSQL epoch (2000-01-01).
const POSTGRES_BASE_DATE_OFFSET: Days = Days::new(10_957);

/// Converts an [`i32`] stored in [`arrow::array::types::Date32Type`] to PostgresSQL TimestampWithTimeZone
///
/// Takes into account [`arrow::array::types::Date32Type`] stores the number of days
/// elapsed since UNIX epoch (1970-01-01).
/// Postgres [`datum::TimestampWithTimeZone`] type takes timestamp in microseconds
/// with epoch (2000-01-01)
#[inline(always)]
pub(crate) fn arrow_date32_to_postgres_timestamps(
timestamp_in_days: i32,
) -> Result<Option<TimestampWithTimeZone>, FromTimeError> {
arrow_date64_to_postgres_timestamps(
timestamp_in_days as i64 * SECONDS_IN_DAY * MILLISECONDS_IN_SECOND,
)
}

/// Converts an [`i64`] stored in [`arrow::array::types::Date64Type`] to PostgresSQL TimestampWithTimeZone
///
/// Takes into account [`arrow::array::types::Date64Type`] stores the number of milliseconds
/// elapsed since UNIX epoch (1970-01-01).
/// Postgres [`datum::TimestampWithTimeZone`] type takes timestamp in microseconds
/// with epoch (2000-01-01)
#[inline(always)]
pub(crate) fn arrow_date64_to_postgres_timestamps(
timestamp_in_milliseconds: i64,
) -> Result<Option<TimestampWithTimeZone>, FromTimeError> {
DateTime::from_timestamp_millis(timestamp_in_milliseconds)
.map(|date_time| date_time.naive_utc())
.and_then(|naive_date_time| naive_date_time.checked_sub_days(POSTGRES_BASE_DATE_OFFSET))
.map(|shifted_naive_date_time| shifted_naive_date_time.and_utc().timestamp_micros())
.map(TimestampWithTimeZone::try_from)
.transpose()
}

#[derive(Clone, Debug)]
pub struct Date(pub NaiveDate);

Expand Down

0 comments on commit cbf395d

Please sign in to comment.