Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Logical Timestamp #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Parquet.Test/Schema/SchemaTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public async Task Column_called_root() {

[Fact]
public async Task ReadSchemaActuallyEqualToWriteSchema() {
var field = new DateTimeDataField("Date", DateTimeFormat.DateAndTime, true);
var field = new DateTimeDataField("Date", DateTimeFormat.DateAndTime, isNullable: true);
var schema = new ParquetSchema(field);

using var memoryStream = new MemoryStream();
Expand Down
12 changes: 12 additions & 0 deletions src/Parquet.Test/Serialisation/SchemaReflectorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public int[]? IntArray { get; set; }

public bool MarkerField;

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Build NuGet

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (macos-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false

Check warning on line 26 in src/Parquet.Test/Serialisation/SchemaReflectorTest.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (windows-latest)

Field 'SchemaReflectorTest.PocoClass.MarkerField' is never assigned to, and will always have its default value false
}

[Fact]
Expand Down Expand Up @@ -323,6 +323,9 @@
[ParquetTimestamp]
public DateTime TimestampDate { get; set; }

[ParquetTimestamp(useLogicalTimestamp: true)]
public DateTime LogicalTimestampDate { get; set; }

[ParquetTimestamp]
public DateTime? NullableTimestampDate { get; set; }

Expand Down Expand Up @@ -383,6 +386,15 @@
Assert.Equal(DateTimeFormat.DateAndTime, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_LogicalTimestamp() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);

DataField df = s.FindDataField(nameof(DatesPoco.LogicalTimestampDate));
Assert.True(df is DateTimeDataField);
Assert.Equal(DateTimeFormat.Timestamp, ((DateTimeDataField)df).DateTimeFormat);
}

[Fact]
public void Type_DateTime_TimestampNullable() {
ParquetSchema s = typeof(DatesPoco).GetParquetSchema(true);
Expand Down
11 changes: 10 additions & 1 deletion src/Parquet.Test/Types/EndToEndTypeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EndToEndTypeTest : TestBase {
["dateTime local kind"] = (new DataField<DateTime>("dateTime unknown kind"), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["impala date local kind"] = (new DateTimeDataField("dateImpala unknown kind", DateTimeFormat.Impala), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["dateDateAndTime local kind"] = (new DateTimeDataField("dateDateAndTime unknown kind", DateTimeFormat.DateAndTime), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
["timestamp utc kind"] = (new DateTimeDataField("timestamp utc kind", DateTimeFormat.Timestamp, true), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Utc)),
["timestamp local kind"] = (new DateTimeDataField("timestamp local kind", DateTimeFormat.Timestamp, false), new DateTime(2020, 06, 10, 11, 12, 13, DateTimeKind.Local)),
// don't want any excess info in the offset INT32 doesn't contain or care about this data
["dateDate"] = (new DateTimeDataField("dateDate", DateTimeFormat.Date), DateTime.UtcNow.RoundToDay()),
#if !NETCOREAPP3_1
Expand Down Expand Up @@ -84,7 +86,7 @@ public class EndToEndTypeTest : TestBase {
["unsigned long max value"] = (new DataField<ulong>("ulong"), ulong.MaxValue),

["nullable decimal"] = (new DecimalDataField("decimal?", 4, 1, true, true), null),
["nullable DateTime"] = (new DateTimeDataField("DateTime?", DateTimeFormat.DateAndTime, true), null),
["nullable DateTime"] = (new DateTimeDataField("DateTime?", DateTimeFormat.DateAndTime, isNullable: true), null),

["bool"] = (new DataField<bool>("bool"), true),
["nullable bool"] = (new DataField<bool?>("bool?"), new bool?(true)),
Expand Down Expand Up @@ -124,6 +126,8 @@ public class EndToEndTypeTest : TestBase {
[InlineData("dateTime local kind")]
[InlineData("impala date local kind")]
[InlineData("dateDateAndTime local kind")]
[InlineData("timestamp utc kind")]
[InlineData("timestamp local kind")]
[InlineData("dateDate")]
#if !NETCOREAPP3_1
[InlineData("dateOnly")]
Expand Down Expand Up @@ -174,6 +178,11 @@ public async Task Type_writes_and_reads_end_to_end(string name) {
equal = true;
else if(actual.GetType().IsArrayOf<byte>() && input.expectedValue != null) {
equal = ((byte[])actual).SequenceEqual((byte[])input.expectedValue);
} else if(input.field is DateTimeDataField { DateTimeFormat: DateTimeFormat.Timestamp }) {
var dtActual = (DateTime)actual;
var dtExpected = (DateTime)input.expectedValue!;
Assert.Equal(dtExpected.Kind, dtActual.Kind);
equal = dtActual.Equals(dtExpected);
} else if(actual.GetType() == typeof(DateTime)) {
var dtActual = (DateTime)actual;
Assert.Equal(DateTimeKind.Utc, dtActual.Kind);
Expand Down
49 changes: 47 additions & 2 deletions src/Parquet/Encodings/ParquetPlainEncoder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Numerics;
Expand Down Expand Up @@ -822,7 +823,27 @@ public static void Encode(ReadOnlySpan<DateTime> data, Stream destination, Schem
}
break;
case TType.INT64:
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
foreach(DateTime element in data) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
long unixTime = element.ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#if NET7_0_OR_GREATER
} else if (tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long unixTime = element.ToUtc().ToUnixMicroseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
} else if (tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long unixTime = element.ToUtc().ToUnixNanoseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
destination.Write(raw, 0, raw.Length);
#endif
} else {
throw new ParquetException($"Unexpected TimeUnit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MILLIS) {
foreach(DateTime element in data) {
long unixTime = element.ToUtc().ToUnixMilliseconds();
byte[] raw = BitConverter.GetBytes(unixTime);
Expand Down Expand Up @@ -896,7 +917,31 @@ public static int Decode(Span<byte> source, Span<DateTime> data, SchemaElement t
long[] longs = ArrayPool<long>.Shared.Rent(data.Length);
try {
int longsRead = Decode(source, longs.AsSpan(0, data.Length));
if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
if(tse.LogicalType?.TIMESTAMP is not null) {
for(int i = 0; i < longsRead; i++) {
if(tse.LogicalType.TIMESTAMP.Unit.MILLIS is not null) {
DateTime dt = longs[i].AsUnixMillisecondsInDateTime();
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else if(tse.LogicalType.TIMESTAMP.Unit.MICROS is not null) {
long lv = longs[i];
long microseconds = lv % 1000;
lv /= 1000;
DateTime dt = lv.AsUnixMillisecondsInDateTime().AddTicks(microseconds * 10);
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else if(tse.LogicalType.TIMESTAMP.Unit.NANOS is not null) {
long lv = longs[i];
long nanoseconds = lv % 1000000;
lv /= 1000000;
DateTime dt = lv.AsUnixMillisecondsInDateTime().AddTicks(nanoseconds / 100); // 1 tick = 100 nanoseconds
dt = DateTime.SpecifyKind(dt, tse.LogicalType.TIMESTAMP.IsAdjustedToUTC ? DateTimeKind.Utc : DateTimeKind.Local);
data[i] = dt;
} else {
throw new ParquetException($"Unexpected TimeUnit: {tse.LogicalType.TIMESTAMP.Unit}");
}
}
} else if(tse.ConvertedType == ConvertedType.TIMESTAMP_MICROS) {
for(int i = 0; i < longsRead; i++) {
long lv = longs[i];
long microseconds = lv % 1000;
Expand Down
26 changes: 25 additions & 1 deletion src/Parquet/Encodings/SchemaEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Parquet.File.Values.Primitives;
using Parquet.Meta;
using Parquet.Schema;
using Parquet.Serialization;
using SType = System.Type;
using Type = Parquet.Meta.Type;

Expand Down Expand Up @@ -201,7 +202,7 @@ private static bool TryBuildDataField(SchemaElement se, ParquetOptions options,
_ => typeof(int)
},
Type.INT32 => typeof(int),

Type.INT64 when se.LogicalType?.TIMESTAMP != null => typeof(DateTime),
Type.INT64 when se.ConvertedType != null => se.ConvertedType switch {
ConvertedType.INT_64 => typeof(long),
ConvertedType.UINT_64 => typeof(ulong),
Expand Down Expand Up @@ -267,6 +268,10 @@ private static DataField GetDecimalDataField(SchemaElement se) =>
se.Scale.GetValueOrDefault(DecimalFormatDefaults.DefaultScale));

private static DataField GetDateTimeDataField(SchemaElement se) {
if(se.LogicalType is not null)
if(se.LogicalType.TIMESTAMP is not null)
return new DateTimeDataField(se.Name, DateTimeFormat.Timestamp, isAdjustedToUTC: se.LogicalType.TIMESTAMP.IsAdjustedToUTC, unit: se.LogicalType.TIMESTAMP.Unit.Convert());

switch(se.ConvertedType) {
case ConvertedType.TIMESTAMP_MILLIS:
if(se.Type == Type.INT64)
Expand Down Expand Up @@ -465,6 +470,25 @@ public static SchemaElement Encode(DataField field) {
tse.Type = Type.INT32;
tse.ConvertedType = ConvertedType.DATE;
break;
case DateTimeFormat.Timestamp:
tse.Type = Type.INT64;
tse.LogicalType = new LogicalType { TIMESTAMP = new TimestampType {
IsAdjustedToUTC = dfDateTime.IsAdjustedToUTC,
Unit = dfDateTime.Unit switch {
DateTimeTimeUnit.Millis => new TimeUnit {
MILLIS = new MilliSeconds(),
},
DateTimeTimeUnit.Micros => new TimeUnit {
MICROS = new MicroSeconds(),
},
DateTimeTimeUnit.Nanos => new TimeUnit {
NANOS = new NanoSeconds(),
},
_ => throw new ParquetException($"Unexpected TimeUnit: {dfDateTime.Unit}")
}
}
};
break;
default:
tse.Type = Type.INT96;
break;
Expand Down
9 changes: 9 additions & 0 deletions src/Parquet/Extensions/OtherExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ static class OtherExtensions {
private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private const long UnixEpochMilliseconds = 62_135_596_800_000L;
private const long UnixEpochMicroseconds = 62_135_596_800_000_000L;

#if NET7_0_OR_GREATER
private static long UnixEpochNanoseconds = UnixEpoch.Ticks * TimeSpan.NanosecondsPerTick;
#endif

public static DateTimeOffset FromUnixMilliseconds(this long unixMilliseconds) {
return UnixEpoch.AddMilliseconds(unixMilliseconds);
Expand All @@ -28,6 +32,11 @@ public static long ToUnixMicroseconds(this DateTime dto) {
long microseconds = dto.Ticks / TimeSpan.TicksPerMicrosecond;
return microseconds - UnixEpochMicroseconds;
}

public static long ToUnixNanoseconds(this DateTime dto) {
long nanoseconds = dto.Ticks * TimeSpan.NanosecondsPerTick;
return nanoseconds - UnixEpochNanoseconds;
}
#endif

public static DateTime AsUnixDaysInDateTime(this int unixDays) {
Expand Down
28 changes: 27 additions & 1 deletion src/Parquet/Schema/DateTimeDataField.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,44 @@ public class DateTimeDataField : DataField {
/// Desired data format, Parquet specific
/// </summary>
public DateTimeFormat DateTimeFormat { get; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; }

/// <summary>
/// TimeUnit
/// </summary>
public DateTimeTimeUnit Unit { get; }

/// <summary>
/// Initializes a new instance of the <see cref="DateTimeDataField"/> class.
/// </summary>
/// <param name="name">The name.</param>
/// <param name="format">The format.</param>
/// <param name="isAdjustedToUTC"></param>
/// <param name="unit"></param>
/// <param name="isNullable"></param>
/// <param name="isArray"></param>
/// <param name="propertyName">When set, uses this property to get the field's data. When not set, uses the property that matches the name parameter.</param>
public DateTimeDataField(string name, DateTimeFormat format, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
public DateTimeDataField(string name, DateTimeFormat format, bool isAdjustedToUTC = true, DateTimeTimeUnit? unit = null, bool? isNullable = null, bool? isArray = null, string? propertyName = null)
: base(name, typeof(DateTime), isNullable, isArray, propertyName) {
DateTimeFormat = format;
IsAdjustedToUTC = isAdjustedToUTC;

// Override the unit for legacy types
if(format == DateTimeFormat.DateAndTime) {
Unit = DateTimeTimeUnit.Millis;
}
#if NET7_0_OR_GREATER
else if(format == DateTimeFormat.DateAndTimeMicros) {
Unit = DateTimeTimeUnit.Micros;
}
#endif
else {
Unit = unit ?? DateTimeTimeUnit.Millis;
}
}
}
}
7 changes: 6 additions & 1 deletion src/Parquet/Schema/DateTimeFormat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public enum DateTimeFormat {
/// <summary>
/// Only stores a date. Time portion is truncated. Internally stored as INT32
/// </summary>
Date
Date,

/// <summary>
/// Logical Type Timestamp.
/// </summary>
Timestamp,
}
}
19 changes: 19 additions & 0 deletions src/Parquet/Schema/DateTimeTimeUnit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Parquet.Schema {
/// <summary>
/// Support Time/Timestamp Units
/// </summary>
public enum DateTimeTimeUnit {
/// <summary>
/// Millisecond Precision
/// </summary>
Millis,
/// <summary>
/// Microsecond Precision
/// </summary>
Micros,
/// <summary>
/// Nanosecond Precision, note dotnet does not support full Nano second precision for DateTime
/// </summary>
Nanos,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,31 @@ public class ParquetTimestampAttribute : Attribute {
/// Creates an instance of the attribute
/// </summary>
/// <param name="resolution"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds) {
/// <param name="useLogicalTimestamp"></param>
public ParquetTimestampAttribute(ParquetTimestampResolution resolution = ParquetTimestampResolution.Milliseconds, bool useLogicalTimestamp = false) {
Resolution = resolution;
UseLogicalTimestamp = useLogicalTimestamp;
}

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public ParquetTimestampResolution Resolution { get; private set; }

/// <summary>
/// Resolution of Parquet timestamp
/// </summary>
public bool UseLogicalTimestamp { get; private set; }

/// <summary>
/// IsAdjustedToUTC
/// </summary>
public bool IsAdjustedToUTC { get; private set; }

internal DateTimeFormat GetDateTimeFormat() {
if(UseLogicalTimestamp)
return DateTimeFormat.Timestamp;

return Resolution switch {
ParquetTimestampResolution.Milliseconds => DateTimeFormat.DateAndTime,
#if NET7_0_OR_GREATER
Expand Down
44 changes: 43 additions & 1 deletion src/Parquet/Serialization/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ private static Field ConstructDataField(string name, string propertyName, Type t
ParquetTimestampAttribute? tsa = member?.TimestampAttribute;
r = new DateTimeDataField(name,
tsa == null ? DateTimeFormat.Impala : tsa.GetDateTimeFormat(),
t == typeof(DateTime?), null, propertyName);
isAdjustedToUTC: tsa == null ? true : tsa.IsAdjustedToUTC,
unit: tsa?.Resolution.Convert(),
isNullable: t == typeof(DateTime?), null, propertyName);
} else if(t == typeof(TimeSpan) || t == typeof(TimeSpan?)) {
r = new TimeSpanDataField(name,
member?.MicroSecondsTimeAttribute == null
Expand Down Expand Up @@ -272,5 +274,45 @@ private static ParquetSchema CreateSchema(Type t, bool forWriting) {

return new ParquetSchema(fields);
}

/// <summary>
/// Convert Resolution to TimeUnit
/// </summary>
/// <param name="resolution"></param>
/// <returns></returns>
public static DateTimeTimeUnit Convert(this ParquetTimestampResolution resolution) {
switch(resolution) {
case ParquetTimestampResolution.Milliseconds:
return DateTimeTimeUnit.Millis;
#if NET7_0_OR_GREATER
case ParquetTimestampResolution.Microseconds:
return DateTimeTimeUnit.Micros;
#endif
default:
throw new ParquetException($"Unexpected Resolution: {resolution}");
// nanoseconds to be added
}
}

/// <summary>
/// Convert Parquet TimeUnit to TimeUnit
/// </summary>
/// <param name="unit"></param>
/// <returns></returns>
public static DateTimeTimeUnit Convert(this Parquet.Meta.TimeUnit unit) {
if(unit.MILLIS is not null) {
return DateTimeTimeUnit.Millis;
}

if(unit.MICROS is not null) {
return DateTimeTimeUnit.Micros;
}

if(unit.NANOS is not null) {
return DateTimeTimeUnit.Nanos;
}

throw new ParquetException($"Unexpected TimeUnit: {unit}");
}
}
}
Loading