Skip to content

Commit

Permalink
Support timezones in CSV reader (#3841) (#3908)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Mar 23, 2023
1 parent e4e6c67 commit 6af2bf6
Showing 1 changed file with 115 additions and 77 deletions.
192 changes: 115 additions & 77 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ mod records;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::*;
use arrow_array::*;
use arrow_cast::parse::{parse_decimal, Parser};
use arrow_cast::parse::{parse_decimal, string_to_datetime, Parser};
use arrow_schema::*;
use chrono::{TimeZone, Utc};
use lazy_static::lazy_static;
use regex::{Regex, RegexSet};
use std::fmt;
Expand All @@ -56,6 +57,7 @@ use std::sync::Arc;

use crate::map_csv_error;
use crate::reader::records::{RecordDecoder, StringRecords};
use arrow_array::timezone::Tz;
use csv::StringRecord;

lazy_static! {
Expand Down Expand Up @@ -677,33 +679,36 @@ fn parse(
>(
line_number, rows, i, None
),
DataType::Timestamp(TimeUnit::Second, _) => build_primitive_array::<
TimestampSecondType,
>(
line_number, rows, i, None
),
DataType::Timestamp(TimeUnit::Millisecond, _) => {
build_primitive_array::<TimestampMillisecondType>(
DataType::Timestamp(TimeUnit::Second, tz) => {
build_timestamp_array::<TimestampSecondType>(
line_number,
rows,
i,
None,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
build_primitive_array::<TimestampMicrosecondType>(
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
build_timestamp_array::<TimestampMillisecondType>(
line_number,
rows,
i,
None,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
build_primitive_array::<TimestampNanosecondType>(
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
build_timestamp_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
None,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
build_timestamp_array::<TimestampNanosecondType>(
line_number,
rows,
i,
tz.as_deref(),
)
}
DataType::Utf8 => Ok(Arc::new(
Expand Down Expand Up @@ -871,6 +876,54 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
.map(|e| Arc::new(e) as ArrayRef)
}

fn build_timestamp_array<T: ArrowTimestampType>(
line_number: usize,
rows: &StringRecords<'_>,
col_idx: usize,
timezone: Option<&str>,
) -> Result<ArrayRef, ArrowError> {
Ok(Arc::new(match timezone {
Some(timezone) => {
let tz: Tz = timezone.parse()?;
build_timestamp_array_impl::<T, _>(line_number, rows, col_idx, &tz)?
.with_timezone(timezone)
}
None => build_timestamp_array_impl::<T, _>(line_number, rows, col_idx, &Utc)?,
}))
}

fn build_timestamp_array_impl<T: ArrowTimestampType, Tz: TimeZone>(
line_number: usize,
rows: &StringRecords<'_>,
col_idx: usize,
timezone: &Tz,
) -> Result<PrimitiveArray<T>, ArrowError> {
rows.iter()
.enumerate()
.map(|(row_index, row)| {
let s = row.get(col_idx);
if s.is_empty() {
return Ok(None);
}

let date = string_to_datetime(timezone, s).map_err(|e| {
ArrowError::ParseError(format!(
"Error parsing column {col_idx} at line {}: {}",
line_number + row_index,
e
))
})?;

Ok(Some(match T::UNIT {
TimeUnit::Second => date.timestamp(),
TimeUnit::Millisecond => date.timestamp_millis(),
TimeUnit::Microsecond => date.timestamp_micros(),
TimeUnit::Nanosecond => date.timestamp_nanos(),
}))
})
.collect()
}

// parses a specific column (col_idx) into an Arrow Array.
fn build_boolean_array(
line_number: usize,
Expand Down Expand Up @@ -1147,7 +1200,6 @@ mod tests {
use tempfile::NamedTempFile;

use arrow_array::cast::AsArray;
use chrono::prelude::*;

#[test]
fn test_csv() {
Expand Down Expand Up @@ -1686,75 +1738,61 @@ mod tests {
);
}

#[test]
fn test_parse_timestamp_microseconds() {
assert_eq!(
parse_item::<TimestampMicrosecondType>("1970-01-01T00:00:00Z").unwrap(),
0
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
);
assert_eq!(
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10").unwrap(),
naive_datetime.timestamp_nanos() / 1000
);
assert_eq!(
parse_item::<TimestampMicrosecondType>("2018-11-13 17:11:10").unwrap(),
naive_datetime.timestamp_nanos() / 1000
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
);
assert_eq!(
parse_item::<TimestampMicrosecondType>("2018-11-13T17:11:10.011").unwrap(),
naive_datetime.timestamp_nanos() / 1000
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
);
assert_eq!(
parse_item::<TimestampMicrosecondType>("1900-02-28T12:34:56").unwrap(),
naive_datetime.timestamp_nanos() / 1000
);
fn test_parse_timestamp_impl<T: ArrowTimestampType>(
timezone: Option<String>,
expected: &[i64],
) {
let csv = [
"1970-01-01T00:00:00",
"1970-01-01T00:00:00Z",
"1970-01-01T00:00:00+02:00",
]
.join("\n");
let mut decoder = ReaderBuilder::new()
.with_schema(Arc::new(Schema::new(vec![Field::new(
"field",
DataType::Timestamp(T::UNIT, timezone.clone()),
true,
)])))
.build_decoder();

let decoded = decoder.decode(csv.as_bytes()).unwrap();
assert_eq!(decoded, csv.len());
decoder.decode(&[]).unwrap();

let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 3);
let col = batch.column(0).as_primitive::<T>();
assert_eq!(col.values(), expected);
assert_eq!(col.data_type(), &DataType::Timestamp(T::UNIT, timezone));
}

#[test]
fn test_parse_timestamp_nanoseconds() {
assert_eq!(
parse_item::<TimestampNanosecondType>("1970-01-01T00:00:00Z").unwrap(),
0
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
NaiveTime::from_hms_nano_opt(17, 11, 10, 0).unwrap(),
);
assert_eq!(
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10").unwrap(),
naive_datetime.timestamp_nanos()
fn test_parse_timestamp() {
test_parse_timestamp_impl::<TimestampNanosecondType>(
None,
&[0, 0, -7_200_000_000_000],
);
assert_eq!(
parse_item::<TimestampNanosecondType>("2018-11-13 17:11:10").unwrap(),
naive_datetime.timestamp_nanos()
test_parse_timestamp_impl::<TimestampNanosecondType>(
Some("+00:00".to_string()),
&[0, 0, -7_200_000_000_000],
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(2018, 11, 13).unwrap(),
NaiveTime::from_hms_nano_opt(17, 11, 10, 11000000).unwrap(),
test_parse_timestamp_impl::<TimestampNanosecondType>(
Some("-05:00".to_string()),
&[18_000_000_000_000, 0, -7_200_000_000_000],
);
assert_eq!(
parse_item::<TimestampNanosecondType>("2018-11-13T17:11:10.011").unwrap(),
naive_datetime.timestamp_nanos()
test_parse_timestamp_impl::<TimestampMicrosecondType>(
Some("-03".to_string()),
&[10_800_000_000, 0, -7_200_000_000],
);
let naive_datetime = NaiveDateTime::new(
NaiveDate::from_ymd_opt(1900, 2, 28).unwrap(),
NaiveTime::from_hms_nano_opt(12, 34, 56, 0).unwrap(),
test_parse_timestamp_impl::<TimestampMillisecondType>(
Some("-03".to_string()),
&[10_800_000, 0, -7_200_000],
);
assert_eq!(
parse_item::<TimestampNanosecondType>("1900-02-28T12:34:56").unwrap(),
naive_datetime.timestamp_nanos()
test_parse_timestamp_impl::<TimestampSecondType>(
Some("-03".to_string()),
&[10_800, 0, -7_200],
);
}

Expand Down

0 comments on commit 6af2bf6

Please sign in to comment.