Skip to content

Commit

Permalink
Fix TIMESTAMP to align with ORC impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed Mar 31, 2024
1 parent 3369b5d commit 60288cd
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 71 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rust-version = "1.70"
[dependencies]
arrow = { version = "50", features = ["prettyprint"] }
bytes = "1.4"
chrono = { version = "0.4.37", default-features = false, features = ["std"] }
chrono-tz = "0.8.6"
clap = { version = "4.5.3", features = ["derive"], optional = true }
fallible-streaming-iterator = { version = "0.1" }
Expand Down
33 changes: 22 additions & 11 deletions src/arrow_reader/column/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use crate::error::Result;

// TIMESTAMP_BASE is 1 January 2015, the base value for all timestamp values.
// This records the number of seconds since 1 January 1970 (epoch) for the base,
// since Arrow uses the epoch as the base instead.
const TIMESTAMP_BASE_SECONDS_SINCE_EPOCH: i64 = 1_420_070_400;
const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
}

impl TimestampIterator {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
) -> Self {
Self { data, secondary }
Self {
base_from_epoch,
data,
secondary,
}
}
}

Expand All @@ -27,26 +29,35 @@ impl Iterator for TimestampIterator {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
decode_timestamp(seconds_since_orc_base, nanoseconds).transpose()
decode_timestamp(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose()
}
}

fn decode_timestamp(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<u64>,
) -> Result<Option<i64>> {
let data = seconds_since_orc_base?;
let mut nanoseconds = nanoseconds?;
// last 3 bits indicate how many trailing zeros were truncated
// Last 3 bits indicate how many trailing zeros were truncated
let zeros = nanoseconds & 0x7;
nanoseconds >>= 3;
// multiply by powers of 10 to get back the trailing zeros
// Multiply by powers of 10 to get back the trailing zeros
if zeros != 0 {
nanoseconds *= 10_u64.pow(zeros as u32 + 1);
}
// convert into nanoseconds since epoch, which Arrow uses as native representation
let seconds_since_epoch = data + base;
// Timestamps below the UNIX epoch with nanoseconds > 999_999 need to be
// adjusted to have 1 second subtracted due to ORC-763:
// https://issues.apache.org/jira/browse/ORC-763
let seconds = if seconds_since_epoch < 0 && nanoseconds > 999_999 {
seconds_since_epoch - 1
} else {
seconds_since_epoch
};
// Convert into nanoseconds since epoch, which Arrow uses as native representation
// of timestamps
let nanoseconds_since_epoch =
(data + TIMESTAMP_BASE_SECONDS_SINCE_EPOCH) * NANOSECONDS_IN_SECOND + (nanoseconds as i64);
let nanoseconds_since_epoch = seconds * NANOSECONDS_IN_SECOND + (nanoseconds as i64);
Ok(Some(nanoseconds_since_epoch))
}
63 changes: 5 additions & 58 deletions src/arrow_reader/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use arrow::buffer::NullBuffer;
use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
TimestampNanosecondType,
};
use arrow::record_batch::RecordBatch;
use snafu::ResultExt;
Expand All @@ -24,15 +23,16 @@ use self::list::ListArrayDecoder;
use self::map::MapArrayDecoder;
use self::string::{new_binary_decoder, new_string_decoder};
use self::struct_decoder::StructArrayDecoder;
use self::timestamp::{new_timestamp_decoder, new_timestamp_instant_decoder};

use super::column::timestamp::TimestampIterator;
use super::column::{get_present_vec, Column};

mod decimal;
mod list;
mod map;
mod string;
mod struct_decoder;
mod timestamp;

struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
iter: Box<dyn Iterator<Item = Result<T::Native>> + Send>,
Expand Down Expand Up @@ -105,37 +105,8 @@ type Int16ArrayDecoder = PrimitiveArrayDecoder<Int16Type>;
type Int8ArrayDecoder = PrimitiveArrayDecoder<Int8Type>;
type Float32ArrayDecoder = PrimitiveArrayDecoder<Float32Type>;
type Float64ArrayDecoder = PrimitiveArrayDecoder<Float64Type>;
type TimestampArrayDecoder = PrimitiveArrayDecoder<TimestampNanosecondType>;
type DateArrayDecoder = PrimitiveArrayDecoder<Date32Type>; // TODO: does ORC encode as i64 or i32?

/// Wrapper around TimestampArrayDecoder to allow specifying the timezone of the output
/// timestamp array
struct TimestampInstantArrayDecoder(TimestampArrayDecoder);

impl TimestampInstantArrayDecoder {
pub fn new(
iter: Box<dyn Iterator<Item = Result<i64>> + Send>,
present: Option<Box<dyn Iterator<Item = bool> + Send>>,
) -> Self {
Self(TimestampArrayDecoder::new(iter, present))
}
}

impl ArrayBatchDecoder for TimestampInstantArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<ArrayRef> {
let array = self
.0
.next_primitive_batch(batch_size, parent_present)?
.with_timezone("UTC");
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

/// Wrapper around PrimitiveArrayDecoder to allow specifying the precision and scale
/// of the output decimal array.
struct DecimalArrayDecoder {
Expand Down Expand Up @@ -404,35 +375,11 @@ pub fn array_decoder_factory(
DataType::Decimal {
precision, scale, ..
} => new_decimal_decoder(column, stripe, *precision, *scale)?,
DataType::Timestamp { .. } => {
// TODO: this needs to consider timezone
// TODO: here
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let iter = Box::new(TimestampIterator::new(data, secondary));
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

Box::new(TimestampArrayDecoder::new(iter, present))
}
// TODO: duplicated with above
DataType::Timestamp { .. } => new_timestamp_decoder(column, stripe)?,
DataType::TimestampWithLocalTimezone { .. } => {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let iter = Box::new(TimestampIterator::new(data, secondary));
let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

Box::new(TimestampInstantArrayDecoder::new(iter, present))
new_timestamp_instant_decoder(column, stripe)?
}

DataType::Date { .. } => {
let iter = stripe.stream_map().get(column, Kind::Data);
let iter = get_rle_reader(column, iter)?;
Expand Down
143 changes: 143 additions & 0 deletions src/arrow_reader/decoder/timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
use std::sync::Arc;

use crate::{
arrow_reader::column::{get_present_vec, timestamp::TimestampIterator, Column},
error::Result,
proto::stream::Kind,
reader::decode::get_rle_reader,
stripe::Stripe,
};
use arrow::{array::ArrayRef, datatypes::TimestampNanosecondType};
use chrono::offset::TimeZone;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

/// Decodes a TIMESTAMP column stripe into batches of TimestampNanosecondArrays with no
/// timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// is specified for the stripe.
pub fn new_timestamp_decoder(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

match stripe.writer_tz() {
Some(tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
let iter = Box::new(TimestampIterator::new(
seconds_since_unix_epoch,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz: tz,
}))
}
None => {
// No writer timezone, we can assume UTC, so we casn use known fixed value
// for the base offset.
let iter = Box::new(TimestampIterator::new(
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(decoder))
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of TimestampNanosecondArrays with
/// UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
let iter = Box::new(TimestampIterator::new(
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));

let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampInstantArrayDecoder(decoder)))
}

type RawTimestampArrayDecoder = PrimitiveArrayDecoder<TimestampNanosecondType>;

/// Wrapper around RawTimestampArrayDecoder to decode timestamps which are encoded in
/// timezone of the writer to their UTC value.
struct TimestampOffsetArrayDecoder {
inner: RawTimestampArrayDecoder,
writer_tz: chrono_tz::Tz,
}

impl ArrayBatchDecoder for TimestampOffsetArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<ArrayRef> {
let array = self
.inner
.next_primitive_batch(batch_size, parent_present)?;
let array = array.unary_opt::<_, TimestampNanosecondType>(|ts| {
// Convert from writer timezone to reader timezone (which we default to UTC)
// TODO: more efficient way of doing this?
self.writer_tz
.timestamp_nanos(ts)
.naive_local()
.and_utc()
.timestamp_nanos_opt()
});
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

/// Wrapper around RawTimestampArrayDecoder to allow specifying the timezone of the output
/// timestamp array as UTC.
struct TimestampInstantArrayDecoder(RawTimestampArrayDecoder);

impl ArrayBatchDecoder for TimestampInstantArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&[bool]>,
) -> Result<ArrayRef> {
let array = self
.0
.next_primitive_batch(batch_size, parent_present)?
.with_timezone("UTC");
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}
4 changes: 2 additions & 2 deletions tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ fn empty_file() {
}

#[test]
#[ignore] // TODO: Incorrect timezone + representation differs
fn test_date_1900() {
test_expected_file("TestOrcFile.testDate1900");
}

#[test]
#[ignore] // TODO: Incorrect timezone + representation differs
#[ignore]
// TODO: pending https://github.com/chronotope/chrono-tz/issues/155
fn test_date_2038() {
test_expected_file("TestOrcFile.testDate2038");
}
Expand Down

0 comments on commit 60288cd

Please sign in to comment.