Skip to content

Commit

Permalink
feat: Logical Timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
cliedeman committed Jun 15, 2024
1 parent 3139f8d commit 33609a3
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 6 deletions.
12 changes: 12 additions & 0 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ class DatesPoco {
[ParquetTimestamp]
public DateTime TimestampDate { get; set; }

[ParquetTimestamp(useLogicalTimestamp: true)]
public DateTime LogicalTimestampDate { get; set; }

[ParquetTimestamp]
public DateTime? NullableTimestampDate { get; set; }

Expand Down Expand Up @@ -383,6 +386,15 @@ public void Type_DateTime_Timestamp() {
Assert.Equal(DateTimeFormat.DateAndTime, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_LogicalTimestamp() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

DataField df = s.FindDataField(nameof(DatesPoco.LogicalTimestampDate));
Assert.True(df is DateTimeDataField);
Assert.Equal(DateTimeFormat.Timestamp, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_TimestampNullable() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);
Expand Down
9 changes: 9 additions & 0 deletions src/Parquet.Test/Types/EndToEndTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EndToEndTypeTest : TestBase {
["dateTime local kind"] = (new DataField<DateTime>("dateTime unknown kind"), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["impala date local kind"] = (new DateTimeDataField("dateImpala unknown kind", DateTimeFormat.Impala), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["timestamp utc kind"] = (new DateTimeDataField("timestamp utc kind", DateTimeFormat.Timestamp, true), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Utc)),
["timestamp local kind"] = (new DateTimeDataField("timestamp local kind", DateTimeFormat.Timestamp, false), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
// don't want any excess info in the offset INT32 doesn't contain or care about this data
["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()),
#if !NETCOREAPP3_1
Expand Down Expand Up @@ -124,6 +126,8 @@ public class EndToEndTypeTest : TestBase {
[InlineData("dateTime local kind")]
[InlineData("impala date local kind")]
[InlineData("dateDateAndTime local kind")]
[InlineData("timestamp utc kind")]
[InlineData("timestamp local kind")]
[InlineData("dateDate")]
#if !NETCOREAPP3_1
[InlineData("dateOnly")]
Expand Down Expand Up @@ -174,6 +178,11 @@ public async Task Type_writes_and_reads_end_to_end(string name) {
equal = true;
else if(actual.GetType().IsArrayOf<byte>() && input.expectedValue != null) {
equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue);
} else if(input.field is DateTimeDataField { DateTimeFormat: DateTimeFormat.Timestamp }) {
var dtActual = (DateTime)actual;
var dtExpected = (DateTime)input.expectedValue!;
Assert.Equal(dtExpected.Kind, dtActual.Kind);
equal = dtActual.Equals(dtExpected);
} else if(actual.GetType() == typeof(DateTime)) {
var dtActual = (DateTime)actual;
Assert.Equal(DateTimeKind.Utc, dtActual.Kind);
Expand Down
18 changes: 16 additions & 2 deletions src/Parquet/Encodings/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,14 @@ public static void Encode(ReadOnlySpan<DateTime> data, Stream destination, Schem
}
break;
case TType.INT64:
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
foreach(DateTime element in data) {
// TODO: check docs
long unixTime = element.ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
foreach(DateTime element in data) {
long unixTime = element.ToUtc().ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
Expand Down Expand Up @@ -896,7 +903,14 @@ public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement t
long[] longs = ArrayPool<long>.Shared.Rent(data.Length);
try {
int longsRead = Decode(source, longs.AsSpan(0, data.Length));
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
// TODO: unit
for(int i = 0; i < longsRead; i++) {
DateTime dt = longs[i].AsUnixMillisecondsInDateTime();
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
for(int i = 0; i < longsRead; i++) {
long lv = longs[i];
long microseconds = lv % 1000;
Expand Down
15 changes: 14 additions & 1 deletion src/Parquet/Encodings/SchemaEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private static bool TryBuildDataField(SchemaElement se, ParquetOptions options,
_ => typeof(int)
},
Type.INT32 => typeof(int),

Type.INT64 when se.LogicalType?.TIMESTAMP != null => typeof(DateTime),
Type.INT64 when se.ConvertedType != null => se.ConvertedType switch {
ConvertedType.INT_64 => typeof(long),
ConvertedType.UINT_64 => typeof(ulong),
Expand Down Expand Up @@ -267,6 +267,11 @@ private static DataField GetDecimalDataField(SchemaElement se) =>
se.Scale.GetValueOrDefault(DecimalFormatDefaults.DefaultScale));

private static DataField GetDateTimeDataField(SchemaElement se) {
if(se.LogicalType is not null)
if(se.LogicalType.TIMESTAMP is not null)
// TODO: unit
return new DateTimeDataField(se.Name, DateTimeFormat.Timestamp, se.LogicalType.TIMESTAMP.IsAdjustedToUTC);

switch(se.ConvertedType) {
case ConvertedType.TIMESTAMP_MILLIS:
if(se.Type == Type.INT64)
Expand Down Expand Up @@ -465,6 +470,14 @@ public static SchemaElement Encode(DataField field) {
tse.Type = Type.INT32;
tse.ConvertedType = ConvertedType.DATE;
break;
case DateTimeFormat.Timestamp:
tse.Type = Type.INT64;
tse.LogicalType = new LogicalType { TIMESTAMP = new TimestampType {
IsAdjustedToUTC = dfDateTime.IsAdjustedToUTC,
// TODO: unit
// Unit =
} };
break;
default:
tse.Type = Type.INT96;
break;
Expand Down
9 changes: 8 additions & 1 deletion src/Parquet/Schema/DateTimeDataField.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,25 @@ public class DateTimeDataField : DataField {
/// Desired data format, Parquet specific
/// </summary>
public DateTimeFormat DateTimeFormat { get; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; }

/// <summary>
/// Initializes a new instance of the <see cref="DateTimeDataField"/> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="format">The format.</param>
/// <param name="isAdjustedToUTC"></param>
/// <param name="isNullable"></param>
/// <param name="isArray"></param>
/// <param name="propertyName">When set, uses this property to get the field's data. When not set, uses the property that matches the name parameter.</param>
public DateTimeDataField(string name, DateTimeFormat format, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
public DateTimeDataField(string name, DateTimeFormat format, bool isAdjustedToUTC = true, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
: base(name, typeof(DateTime), isNullable, isArray, propertyName) {
DateTimeFormat = format;
IsAdjustedToUTC = isAdjustedToUTC;
}
}
}
7 changes: 6 additions & 1 deletion src/Parquet/Schema/DateTimeFormat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public enum DateTimeFormat {
/// <summary>
/// Only stores a date. Time portion is truncated. Internally stored as INT32
/// </summary>
Date
Date,

/// <summary>
/// Logical Type Timestamp.
/// </summary>
Timestamp,
}
}
60 changes: 60 additions & 0 deletions src/Parquet/Schema/TimestampDataField.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using Parquet.Meta;

namespace Parquet.Schema {
/// <summary>
/// Schema element for <see cref="DateTime"/> which allows to specify precision that matches LogicalType Timestamp
/// </summary>
public class TimestampDataField : DataField {
/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; }

/// <summary>
/// Unit
/// </summary>
public Types.TimeUnit Unit { get; }

/// <summary>
/// Initializes a new instance of the <see cref="DateTimeDataField"/> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="unit"></param>
/// <param name="isAdjustedToUTC"></param>
/// <param name="isNullable"></param>
/// <param name="isArray"></param>
/// <param name="propertyName">When set, uses this property to get the field's data. When not set, uses the property that matches the name parameter.</param>
public TimestampDataField(string name, Types.TimeUnit unit = Types.TimeUnit.MILLIS, bool isAdjustedToUTC = true,
bool? isNullable = null, bool? isArray = null, string? propertyName = null)
: base(name, typeof(DateTime), isNullable, isArray, propertyName) {
IsAdjustedToUTC = isAdjustedToUTC;
Unit = unit;
}
}

/// <summary>
///
/// </summary>
public static class Types {
/// <summary>
///
/// </summary>
public enum TimeUnit {
/// <summary>
///
/// </summary>
MILLIS,

/// <summary>
///
/// </summary>
MICROS,

/// <summary>
///
/// </summary>
NANOS,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,31 @@ public class ParquetTimestampAttribute : Attribute {
/// Creates an instance of the attribute
/// </summary>
/// <param name="resolution"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds) {
/// <param name="useLogicalTimestamp"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds, bool useLogicalTimestamp = false) {
Resolution = resolution;
UseLogicalTimestamp = useLogicalTimestamp;
}

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public ParquetTimestampResolution Resolution { get; private set; }

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public bool UseLogicalTimestamp { get; private set; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; private set; }

internal DateTimeFormat GetDateTimeFormat() {
if(UseLogicalTimestamp)
return DateTimeFormat.Timestamp;

return Resolution switch {
ParquetTimestampResolution.Milliseconds => DateTimeFormat.DateAndTime,
#if NET7_0_OR_GREATER
Expand Down
1 change: 1 addition & 0 deletions src/Parquet/Serialization/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private static Field ConstructDataField(string name, string propertyName, Type t
ParquetTimestampAttribute? tsa = member?.TimestampAttribute;
r = new DateTimeDataField(name,
tsa == null ? DateTimeFormat.Impala : tsa.GetDateTimeFormat(),
tsa == null ? true : tsa.IsAdjustedToUTC,
t == typeof(DateTime?), null, propertyName);
} else if(t == typeof(TimeSpan) || t == typeof(TimeSpan?)) {
r = new TimeSpanDataField(name,
Expand Down

0 comments on commit 33609a3

Please sign in to comment.