Skip to content

Commit

Permalink
feat: Logical Timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
cliedeman committed Jun 16, 2024
1 parent 3139f8d commit f13d05c
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/Parquet.Test/Schema/SchemaTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public async Task Column_called_root() {

[Fact]
public async Task ReadSchemaActuallyEqualToWriteSchema() {
var field = new DateTimeDataField("Date", DateTimeFormat.DateAndTime, true);
var field = new DateTimeDataField("Date", DateTimeFormat.DateAndTime, isNullable: true);
var schema = new ParquetSchema(field);

using var memoryStream = new MemoryStream();
Expand Down
12 changes: 12 additions & 0 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ class DatesPoco {
[ParquetTimestamp]
public DateTime TimestampDate { get; set; }

[ParquetTimestamp(useLogicalTimestamp: true)]
public DateTime LogicalTimestampDate { get; set; }

[ParquetTimestamp]
public DateTime? NullableTimestampDate { get; set; }

Expand Down Expand Up @@ -383,6 +386,15 @@ public void Type_DateTime_Timestamp() {
Assert.Equal(DateTimeFormat.DateAndTime, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_LogicalTimestamp() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

DataField df = s.FindDataField(nameof(DatesPoco.LogicalTimestampDate));
Assert.True(df is DateTimeDataField);
Assert.Equal(DateTimeFormat.Timestamp, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_TimestampNullable() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);
Expand Down
11 changes: 10 additions & 1 deletion src/Parquet.Test/Types/EndToEndTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EndToEndTypeTest : TestBase {
["dateTime local kind"] = (new DataField<DateTime>("dateTime unknown kind"), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["impala date local kind"] = (new DateTimeDataField("dateImpala unknown kind", DateTimeFormat.Impala), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["timestamp utc kind"] = (new DateTimeDataField("timestamp utc kind", DateTimeFormat.Timestamp, true), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Utc)),
["timestamp local kind"] = (new DateTimeDataField("timestamp local kind", DateTimeFormat.Timestamp, false), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
// don't want any excess info in the offset INT32 doesn't contain or care about this data
["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()),
#if !NETCOREAPP3_1
Expand Down Expand Up @@ -84,7 +86,7 @@ public class EndToEndTypeTest : TestBase {
["unsigned long max value"] = (new DataField<ulong>("ulong"), ulong.MaxValue),

["nullable decimal"] = (new DecimalDataField("decimal?", 4, 1, true, true), null),
["nullable DateTime"] = (new DateTimeDataField("DateTime?", DateTimeFormat.DateAndTime, true), null),
["nullable DateTime"] = (new DateTimeDataField("DateTime?", DateTimeFormat.DateAndTime, isNullable: true), null),

["bool"] = (new DataField<bool>("bool"), true),
["nullable bool"] = (new DataField<bool?>("bool?"), new bool?(true)),
Expand Down Expand Up @@ -124,6 +126,8 @@ public class EndToEndTypeTest : TestBase {
[InlineData("dateTime local kind")]
[InlineData("impala date local kind")]
[InlineData("dateDateAndTime local kind")]
[InlineData("timestamp utc kind")]
[InlineData("timestamp local kind")]
[InlineData("dateDate")]
#if !NETCOREAPP3_1
[InlineData("dateOnly")]
Expand Down Expand Up @@ -174,6 +178,11 @@ public async Task Type_writes_and_reads_end_to_end(string name) {
equal = true;
else if(actual.GetType().IsArrayOf<byte>() && input.expectedValue != null) {
equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue);
} else if(input.field is DateTimeDataField { DateTimeFormat: DateTimeFormat.Timestamp }) {
var dtActual = (DateTime)actual;
var dtExpected = (DateTime)input.expectedValue!;
Assert.Equal(dtExpected.Kind, dtActual.Kind);
equal = dtActual.Equals(dtExpected);
} else if(actual.GetType() == typeof(DateTime)) {
var dtActual = (DateTime)actual;
Assert.Equal(DateTimeKind.Utc, dtActual.Kind);
Expand Down
49 changes: 47 additions & 2 deletions src/Parquet/Encodings/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Numerics;
Expand Down Expand Up @@ -822,7 +823,27 @@ public static void Encode(ReadOnlySpan<DateTime> data, Stream destination, Schem
}
break;
case TType.INT64:
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
foreach(DateTime element in data) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
long unixTime = element.ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#if NET7_0_OR_GREATER
} else if (tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long unixTime = element.ToUtc().ToUnixMicroseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
} else if (tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long unixTime = element.ToUtc().ToUnixNanoseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#endif
} else {
throw new ParquetException($"Unexpected TimeUnit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
foreach(DateTime element in data) {
long unixTime = element.ToUtc().ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
Expand Down Expand Up @@ -896,7 +917,31 @@ public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement t
long[] longs = ArrayPool<long>.Shared.Rent(data.Length);
try {
int longsRead = Decode(source, longs.AsSpan(0, data.Length));
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
for(int i = 0; i < longsRead; i++) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
DateTime dt = longs[i].AsUnixMillisecondsInDateTime();
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else if(tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long lv = longs[i];
long microseconds = lv % 1000;
lv /= 1000;
DateTime dt = lv.AsUnixMillisecondsInDateTime().AddTicks(microseconds * 10);
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else if(tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long lv = longs[i];
long nanoseconds = lv % 1000000;
lv /= 1000000;
DateTime dt = lv.AsUnixMillisecondsInDateTime().AddTicks(nanoseconds / 100); // 1 tick = 100 nanoseconds
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else {
throw new ApplicationException($"Unexpected Unit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
for(int i = 0; i < longsRead; i++) {
long lv = longs[i];
long microseconds = lv % 1000;
Expand Down
26 changes: 25 additions & 1 deletion src/Parquet/Encodings/SchemaEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Parquet.File.Values.Primitives;
using Parquet.Meta;
using Parquet.Schema;
using Parquet.Serialization;
using SType = System.Type;
using Type = Parquet.Meta.Type;

Expand Down Expand Up @@ -201,7 +202,7 @@ private static bool TryBuildDataField(SchemaElement se, ParquetOptions options,
_ => typeof(int)
},
Type.INT32 => typeof(int),

Type.INT64 when se.LogicalType?.TIMESTAMP != null => typeof(DateTime),
Type.INT64 when se.ConvertedType != null => se.ConvertedType switch {
ConvertedType.INT_64 => typeof(long),
ConvertedType.UINT_64 => typeof(ulong),
Expand Down Expand Up @@ -267,6 +268,10 @@ private static DataField GetDecimalDataField(SchemaElement se) =>
se.Scale.GetValueOrDefault(DecimalFormatDefaults.DefaultScale));

private static DataField GetDateTimeDataField(SchemaElement se) {
if(se.LogicalType is not null)
if(se.LogicalType.TIMESTAMP is not null)
return new DateTimeDataField(se.Name, DateTimeFormat.Timestamp, isAdjustedToUTC: se.LogicalType.TIMESTAMP.IsAdjustedToUTC, unit: se.LogicalType.TIMESTAMP.Unit.Convert());

switch(se.ConvertedType) {
case ConvertedType.TIMESTAMP_MILLIS:
if(se.Type == Type.INT64)
Expand Down Expand Up @@ -465,6 +470,25 @@ public static SchemaElement Encode(DataField field) {
tse.Type = Type.INT32;
tse.ConvertedType = ConvertedType.DATE;
break;
case DateTimeFormat.Timestamp:
tse.Type = Type.INT64;
tse.LogicalType = new LogicalType { TIMESTAMP = new TimestampType {
IsAdjustedToUTC = dfDateTime.IsAdjustedToUTC,
Unit = dfDateTime.Unit switch {
DateTimeTimeUnit.Millis => new TimeUnit {
MILLIS = new MilliSeconds(),
},
DateTimeTimeUnit.Micros => new TimeUnit {
MICROS = new MicroSeconds(),
},
DateTimeTimeUnit.Nanos => new TimeUnit {
NANOS = new NanoSeconds(),
},
_ => throw new ApplicationException($"Unexpected TimeUnit: {dfDateTime.Unit}")
}
}
};
break;
default:
tse.Type = Type.INT96;
break;
Expand Down
9 changes: 9 additions & 0 deletions src/Parquet/Extensions/OtherExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ static class OtherExtensions {
private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private const long UnixEpochMilliseconds = 62_135_596_800_000L;
private const long UnixEpochMicroseconds = 62_135_596_800_000_000L;

#if NET7_0_OR_GREATER
private static long UnixEpochNanoseconds = UnixEpoch.Ticks * TimeSpan.NanosecondsPerTick;
#endif

public static DateTimeOffset FromUnixMilliseconds(this long unixMilliseconds) {
return UnixEpoch.AddMilliseconds(unixMilliseconds);
Expand All @@ -28,6 +32,11 @@ public static long ToUnixMicroseconds(this DateTime dto) {
long microseconds = dto.Ticks / TimeSpan.TicksPerMicrosecond;
return microseconds - UnixEpochMicroseconds;
}

public static long ToUnixNanoseconds(this DateTime dto) {
long nanoseconds = dto.Ticks * TimeSpan.NanosecondsPerTick;
return nanoseconds - UnixEpochNanoseconds;
}
#endif

public static DateTime AsUnixDaysInDateTime(this int unixDays) {
Expand Down
30 changes: 28 additions & 2 deletions src/Parquet/Schema/DateTimeDataField.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,49 @@ namespace Parquet.Schema {
/// <summary>
/// Schema element for <see cref="DateTime"/> which allows to specify precision
/// </summary>
public class DateTimeDataField : DataField {
public class DateTimeDataField : DataField {
/// <summary>
/// Desired data format, Parquet specific
/// </summary>
public DateTimeFormat DateTimeFormat { get; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; }

/// <summary>
/// TimeUnit
/// </summary>
public DateTimeTimeUnit Unit { get; }

/// <summary>
/// Initializes a new instance of the <see cref="DateTimeDataField"/> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="format">The format.</param>
/// <param name="isAdjustedToUTC"></param>
/// <param name="unit"></param>
/// <param name="isNullable"></param>
/// <param name="isArray"></param>
/// <param name="propertyName">When set, uses this property to get the field's data. When not set, uses the property that matches the name parameter.</param>
public DateTimeDataField(string name, DateTimeFormat format, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
public DateTimeDataField(string name, DateTimeFormat format, bool isAdjustedToUTC = true, DateTimeTimeUnit? unit = null, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
: base(name, typeof(DateTime), isNullable, isArray, propertyName) {
DateTimeFormat = format;
IsAdjustedToUTC = isAdjustedToUTC;

// Override the unit for legacy types
if(format == DateTimeFormat.DateAndTime) {
Unit = DateTimeTimeUnit.Millis;
}
#if NET7_0_OR_GREATER
else if(format == DateTimeFormat.DateAndTimeMicros) {
Unit = DateTimeTimeUnit.Micros;
}
#endif
else {
Unit = unit ?? DateTimeTimeUnit.Millis;
}
}
}
}
7 changes: 6 additions & 1 deletion src/Parquet/Schema/DateTimeFormat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public enum DateTimeFormat {
/// <summary>
/// Only stores a date. Time portion is truncated. Internally stored as INT32
/// </summary>
Date
Date,

/// <summary>
/// Logical Type Timestamp.
/// </summary>
Timestamp,
}
}
19 changes: 19 additions & 0 deletions src/Parquet/Schema/DateTimeTimeUnit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Parquet.Schema {
/// <summary>
/// Support Time/Timestamp Units
/// </summary>
public enum DateTimeTimeUnit {
/// <summary>
/// Millisecond Precision
/// </summary>
Millis,
/// <summary>
/// Microsecond Precision
/// </summary>
Micros,
/// <summary>
/// Nanosecond Precision, note dotnet does not support full Nano second precision for DateTime
/// </summary>
Nanos,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,31 @@ public class ParquetTimestampAttribute : Attribute {
/// Creates an instance of the attribute
/// </summary>
/// <param name="resolution"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds) {
/// <param name="useLogicalTimestamp"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds, bool useLogicalTimestamp = false) {
Resolution = resolution;
UseLogicalTimestamp = useLogicalTimestamp;
}

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public ParquetTimestampResolution Resolution { get; private set; }

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public bool UseLogicalTimestamp { get; private set; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; private set; }

internal DateTimeFormat GetDateTimeFormat() {
if(UseLogicalTimestamp)
return DateTimeFormat.Timestamp;

return Resolution switch {
ParquetTimestampResolution.Milliseconds => DateTimeFormat.DateAndTime,
#if NET7_0_OR_GREATER
Expand Down
Loading

0 comments on commit f13d05c

Please sign in to comment.