Skip to content

Commit

Permalink
feat: DateTimeOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
cliedeman committed Sep 9, 2024
1 parent 80d6327 commit d1c16cc
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 9 deletions.
12 changes: 12 additions & 0 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ class DatesPoco {
[ParquetTimestamp(useLogicalTimestamp: true)]
public DateTime LogicalTimestampDate { get; set; }

[ParquetTimestamp(useLogicalTimestamp: true, isAdjustedToUtc: true)]
public DateTimeOffset DateTimeOffset { get; set; }

[ParquetTimestamp]
public DateTime? NullableTimestampDate { get; set; }

Expand Down Expand Up @@ -394,6 +397,15 @@ public void Type_DateTime_LogicalTimestamp() {
Assert.True(df is DateTimeDataField);
Assert.Equal(DateTimeFormat.Timestamp, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTimeOffset() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

DataField df = s.FindDataField(nameof(DatesPoco.DateTimeOffset));
Assert.True(df is DateTimeOffsetDataField);
Assert.Equal(DateTimeTimeUnit.Millis, ((DateTimeOffsetDataField)df).Unit);
}

[Fact]
public void Type_DateTime_TimestampNullable() {
Expand Down
6 changes: 6 additions & 0 deletions src/Parquet.Test/Types/EndToEndTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class EndToEndTypeTest : TestBase {
["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["timestamp utc kind"] = (new DateTimeDataField("timestamp utc kind", DateTimeFormat.Timestamp, true), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Utc)),
["timestamp local kind"] = (new DateTimeDataField("timestamp local kind", DateTimeFormat.Timestamp, false), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["datetimeoffset utc kind"] = (new DateTimeOffsetDataField("datetimeoffset utc kind", DateTimeTimeUnit.Millis), new DateTimeOffset(2020, 06, 10, 11, 12, 13, TimeSpan.Zero)),
// don't want any excess info in the offset INT32 doesn't contain or care about this data
["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()),
#if !NETCOREAPP3_1
Expand Down Expand Up @@ -128,6 +129,7 @@ public class EndToEndTypeTest : TestBase {
[InlineData("dateDateAndTime local kind")]
[InlineData("timestamp utc kind")]
[InlineData("timestamp local kind")]
[InlineData("datetimeoffset utc kind")]
[InlineData("dateDate")]
#if !NETCOREAPP3_1
[InlineData("dateOnly")]
Expand Down Expand Up @@ -191,6 +193,10 @@ public async Task Type_writes_and_reads_end_to_end(string name) {
? DateTime.SpecifyKind(dtExpected, DateTimeKind.Utc) // assumes value is UTC
: dtExpected.ToUniversalTime();
equal = dtActual.Equals(dtExpected);
} else if(actual.GetType() == typeof(DateTimeOffset)) {
var dtActual = (DateTimeOffset)actual;
var dtExpected = (DateTimeOffset)input.expectedValue!;
equal = dtActual.Equals(dtExpected);
} else {
equal = actual.Equals(input.expectedValue);
}
Expand Down
101 changes: 101 additions & 0 deletions src/Parquet/Encodings/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public static void Encode(
} else if(t == typeof(byte[][])) {
Span<byte[]> span = ((byte[][])data).AsSpan(offset, count);
Encode(span, destination);
} else if(t == typeof(DateTimeOffset[])) {
Span<DateTimeOffset> span = ((DateTimeOffset[])data).AsSpan(offset, count);
Encode(span, destination, tse);
if(stats != null)
FillStats(span, stats);
} else if(t == typeof(DateTime[])) {
Span<DateTime> span = ((DateTime[])data).AsSpan(offset, count);
Encode(span, destination, tse);
Expand Down Expand Up @@ -185,6 +190,9 @@ public static void Decode(
} else if(t == typeof(byte[][])) {
Span<byte[]> span = ((byte[][])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
} else if(t == typeof(DateTimeOffset[])) {
Span<DateTimeOffset> span = ((DateTimeOffset[])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
} else if(t == typeof(DateTime[])) {
Span<DateTime> span = ((DateTime[])data).AsSpan(offset, count);
elementsRead = Decode(source, span, tse);
Expand Down Expand Up @@ -264,6 +272,8 @@ public static bool TryEncode(object? value, SchemaElement tse, out byte[]? resul
} else if(t == typeof(byte[])) {
result = (byte[])value;
return true;
} else if(t == typeof(DateTimeOffset)) {
return TryEncode((DateTimeOffset)value, tse, out result);
} else if(t == typeof(DateTime))
return TryEncode((DateTime)value, tse, out result);
#if NET6_0_OR_GREATER
Expand Down Expand Up @@ -353,6 +363,18 @@ private static decimal TryDecodeDecimal(byte[] value, SchemaElement tse) {
}
}

private static bool TryEncode(DateTimeOffset value, SchemaElement tse, out byte[] result) {
switch(tse.Type) {
case TType.INT64:
long unixTime = value.ToUniversalTime().ToUnixTimeMilliseconds();
result = BitConverter.GetBytes(unixTime);
return true;
default:
throw new InvalidDataException($"data type '{tse.Type}' does not represent any date types");

}
}

private static bool TryEncode(DateTime value, SchemaElement tse, out byte[] result) {
switch(tse.Type) {
case TType.INT32:
Expand Down Expand Up @@ -812,6 +834,40 @@ public static int Decode(Span<byte> source, Span<byte[]> data, SchemaElement tse
return read;
}

public static void Encode(ReadOnlySpan<DateTimeOffset> data, Stream destination, SchemaElement tse) {
switch(tse.Type) {
case TType.INT64:
if(tse.LogicalType?.TIMESTAMP is not null) {
foreach(DateTimeOffset element in data) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
// Always convert to UTC
long unixTime = element.ToUniversalTime().DateTime.ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#if NET7_0_OR_GREATER
} else if (tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long unixTime = element.ToUniversalTime().DateTime.ToUnixMicroseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
} else if (tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long unixTime = element.ToUniversalTime().DateTime.ToUnixNanoseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#endif
} else {
throw new ParquetException($"Unexpected TimeUnit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else {
throw new ArgumentException($"invalid converted type: {tse.ConvertedType}");
}
break;
default:
throw new InvalidDataException($"data type '{tse.Type}' does not represent any date types");

}
}

public static void Encode(ReadOnlySpan<DateTime> data, Stream destination, SchemaElement tse) {

switch(tse.Type) {
Expand Down Expand Up @@ -900,6 +956,45 @@ public static void Encode(ReadOnlySpan<TimeOnly> data, Stream destination, Schem
}
}
#endif
public static int Decode(Span<byte> source, Span<DateTimeOffset> data, SchemaElement tse) {
switch(tse.Type) {
case TType.INT64:
long[] longs = ArrayPool<long>.Shared.Rent(data.Length);
try {
int longsRead = Decode(source, longs.AsSpan(0, data.Length));
if(tse.LogicalType?.TIMESTAMP is not null) {
if(!tse.LogicalType.TIMESTAMP.IsAdjustedToUTC) {
throw new ParquetException("Cannot decode Timestamp to DateTimeOffset with IsAdjustedToUTC set to false");
}

for(int i = 0; i < longsRead; i++) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
data[i] = DateTimeOffset.FromUnixTimeMilliseconds(longs[i]);
} else if(tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long lv = longs[i];
long microseconds = lv % 1000;
lv /= 1000;
data[i] = DateTimeOffset.FromUnixTimeMilliseconds(lv).AddTicks(microseconds * 10);
} else if(tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long lv = longs[i];
long nanoseconds = lv % 1000000;
lv /= 1000000;;
data[i] = DateTimeOffset.FromUnixTimeMilliseconds(lv).AddTicks(nanoseconds / 100); // 1 tick = 100 nanoseconds;
} else {
throw new ParquetException($"Unexpected TimeUnit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else {
throw new ArgumentException("");
}
return longsRead;
} finally {
ArrayPool<long>.Shared.Return(longs);
}
default:
throw new NotSupportedException();
}
}

public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement tse) {
switch(tse.Type) {
Expand Down Expand Up @@ -1300,6 +1395,12 @@ public static void FillStats(ReadOnlySpan<float> data, DataColumnStatistics stat
stats.MaxValue = max;
}

public static void FillStats(ReadOnlySpan<DateTimeOffset> data, DataColumnStatistics stats) {
data.MinMax(out DateTimeOffset min, out DateTimeOffset max);
stats.MinValue = min;
stats.MaxValue = max;
}

public static void FillStats(ReadOnlySpan<DateTime> data, DataColumnStatistics stats) {
data.MinMax(out DateTime min, out DateTime max);
stats.MinValue = min;
Expand Down
29 changes: 28 additions & 1 deletion src/Parquet/Encodings/SchemaEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static class SchemaEncoder {
typeof(decimal),
typeof(BigInteger),
typeof(DateTime),
typeof(DateTimeOffset),
#if NET6_0_OR_GREATER
typeof(DateOnly),
typeof(TimeOnly),
Expand Down Expand Up @@ -472,7 +473,8 @@ public static SchemaElement Encode(DataField field) {
break;
case DateTimeFormat.Timestamp:
tse.Type = Type.INT64;
tse.LogicalType = new LogicalType { TIMESTAMP = new TimestampType {
tse.LogicalType = new LogicalType {
TIMESTAMP = new TimestampType {
IsAdjustedToUTC = dfDateTime.IsAdjustedToUTC,
Unit = dfDateTime.Unit switch {
DateTimeTimeUnit.Millis => new TimeUnit {
Expand All @@ -493,9 +495,34 @@ public static SchemaElement Encode(DataField field) {
tse.Type = Type.INT96;
break;
}
} else if(field is DateTimeOffsetDataField) {
throw new ParquetException($"Unexpected DataField: {field.GetType()} should be DateTimeDataField");
} else {
tse.Type = Type.INT96;
}
} else if(st == typeof(DateTimeOffset)) {
if(field is DateTimeOffsetDataField dtDateTimeOffset) {
tse.Type = Type.INT64;
tse.LogicalType = new LogicalType {
TIMESTAMP = new TimestampType {
IsAdjustedToUTC = dtDateTimeOffset.IsAdjustedToUTC,
Unit = dtDateTimeOffset.Unit switch {
DateTimeTimeUnit.Millis => new TimeUnit {
MILLIS = new MilliSeconds(),
},
DateTimeTimeUnit.Micros => new TimeUnit {
MICROS = new MicroSeconds(),
},
DateTimeTimeUnit.Nanos => new TimeUnit {
NANOS = new NanoSeconds(),
},
_ => throw new ParquetException($"Unexpected TimeUnit: {dtDateTimeOffset.Unit}")
}
}
};
} else {
throw new ParquetException($"Unexpected DataField: {field.GetType()} should be DateTimeOffsetDataField");
}
#if NET6_0_OR_GREATER
} else if(st == typeof(DateOnly)) {
// DateOnly
Expand Down
11 changes: 11 additions & 0 deletions src/Parquet/Extensions/SpanExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ public static void MinMax(this ReadOnlySpan<float> span, out float min, out floa
}
}

public static void MinMax(this ReadOnlySpan<DateTimeOffset> span, out DateTimeOffset min, out DateTimeOffset max) {
min = span.IsEmpty ? default(DateTimeOffset) : span[0];
max = min;
foreach(DateTimeOffset i in span) {
if(i < min)
min = i;
if(i > max)
max = i;
}
}

public static void MinMax(this ReadOnlySpan<DateTime> span, out DateTime min, out DateTime max) {
min = span.IsEmpty ? default(DateTime) : span[0];
max = min;
Expand Down
9 changes: 2 additions & 7 deletions src/Parquet/Schema/DataField.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ public DataField(string name, Type clrType, bool? isNullable = null, bool? isArr

Discover(clrType, isCompiledWithNullable ?? true, out Type baseType, out bool discIsArray, out bool discIsNullable);
ClrType = baseType;
if(!SchemaEncoder.IsSupported(ClrType)) {
if(baseType == typeof(DateTimeOffset)) {
throw new NotSupportedException($"{nameof(DateTimeOffset)} support was dropped due to numerous ambiguity issues, please use {nameof(DateTime)} from now on.");
}
else {
throw new NotSupportedException($"type {clrType} is not supported");
}
if(!SchemaEncoder.IsSupported(ClrType)){
throw new NotSupportedException($"type {clrType} is not supported");
}

IsNullable = isNullable ?? discIsNullable;
Expand Down
32 changes: 32 additions & 0 deletions src/Parquet/Schema/DateTimeOffsetDataField.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;

namespace Parquet.Schema {
/// <summary>
/// Schema element for <see cref="DateTime"/> which allows to specify precision
/// </summary>
public class DateTimeOffsetDataField : DataField {
/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC => true;

/// <summary>
/// TimeUnit
/// </summary>
public DateTimeTimeUnit Unit { get; }


/// <summary>
/// Initializes a new instance of the <see cref="DateTimeDataField"/> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="unit"></param>
/// <param name="isNullable"></param>
/// <param name="isArray"></param>
/// <param name="propertyName">When set, uses this property to get the field's data. When not set, uses the property that matches the name parameter.</param>
public DateTimeOffsetDataField(string name, DateTimeTimeUnit? unit = null, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
: base(name, typeof(DateTimeOffset), isNullable, isArray, propertyName) {
Unit = unit ?? DateTimeTimeUnit.Millis;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ public class ParquetTimestampAttribute : Attribute {
/// </summary>
/// <param name="resolution"></param>
/// <param name="useLogicalTimestamp"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds, bool useLogicalTimestamp = false) {
/// <param name="isAdjustedToUtc"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds, bool useLogicalTimestamp = false, bool isAdjustedToUtc = false) {
Resolution = resolution;
UseLogicalTimestamp = useLogicalTimestamp;
IsAdjustedToUTC = isAdjustedToUtc;
}

/// <summary>
Expand Down
10 changes: 10 additions & 0 deletions src/Parquet/Serialization/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ private static Field ConstructDataField(string name, string propertyName, Type t
isAdjustedToUTC: tsa == null ? true : tsa.IsAdjustedToUTC,
unit: tsa?.Resolution.Convert(),
isNullable: t == typeof(DateTime?), null, propertyName);
} else if (t == typeof(DateTimeOffset) || t == typeof(DateTimeOffset?)) {
ParquetTimestampAttribute? tsa = member?.TimestampAttribute;

if(tsa?.IsAdjustedToUTC != true) {
throw new ParquetException("DateTimeOffset must have IsAdjustedToUTC set to true");
}

r = new DateTimeOffsetDataField(name,
unit: tsa?.Resolution.Convert(),
isNullable: t == typeof(DateTimeOffset?), null, propertyName);
} else if(t == typeof(TimeSpan) || t == typeof(TimeSpan?)) {
r = new TimeSpanDataField(name,
member?.MicroSecondsTimeAttribute == null
Expand Down

0 comments on commit d1c16cc

Please sign in to comment.