Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing implementation for datetime relevant arrow type into dataframe #6675

Merged
merged 7 commits into from
Jul 6, 2023
2 changes: 1 addition & 1 deletion eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<SystemTextJsonVersion>6.0.1</SystemTextJsonVersion>
<SystemThreadingChannelsVersion>4.7.1</SystemThreadingChannelsVersion>
<!-- Other product dependencies -->
<ApacheArrowVersion>2.0.0</ApacheArrowVersion>
<ApacheArrowVersion>11.0.0</ApacheArrowVersion>
<GoogleProtobufVersion>3.19.6</GoogleProtobufVersion>
<LightGBMVersion>2.3.1</LightGBMVersion>
<MicrosoftCodeAnalysisAnalyzersVersion>3.3.0</MicrosoftCodeAnalysisAnalyzersVersion>
Expand Down
15 changes: 12 additions & 3 deletions src/Microsoft.Data.Analysis/DataFrame.Arrow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,18 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray
AppendDataFrameColumnFromArrowArray(fieldsEnumerator.Current, structArrayEnumerator.Current, ret, field.Name + "_");
}
break;
case ArrowTypeId.Decimal:
case ArrowTypeId.Date64:
Date64Array arrowDate64Array = (Date64Array)arrowArray;
dataFrameColumn = new DateTimeDataFrameColumn(fieldName, arrowDate64Array.Data.Length);
for (int i = 0; i < arrowDate64Array.Data.Length; i++)
{
dataFrameColumn[i] = arrowDate64Array.GetDateTime(i);
}
break;
case ArrowTypeId.Decimal128:
case ArrowTypeId.Decimal256:
case ArrowTypeId.Binary:
case ArrowTypeId.Date32:
case ArrowTypeId.Date64:
case ArrowTypeId.Dictionary:
case ArrowTypeId.FixedSizedBinary:
case ArrowTypeId.HalfFloat:
Expand All @@ -114,6 +122,7 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray
case ArrowTypeId.Null:
case ArrowTypeId.Time32:
case ArrowTypeId.Time64:
case ArrowTypeId.Timestamp:
default:
throw new NotImplementedException($"{fieldType.Name}");
}
Expand Down Expand Up @@ -145,7 +154,7 @@ public static DataFrame FromArrowRecordBatch(RecordBatch recordBatch)
}

/// <summary>
/// Returns an <see cref="IEnumerable{RecordBatch}"/> without copying data
/// Returns an <see cref="IEnumerable{RecordBatch}"/> mostly without copying data
/// </summary>
public IEnumerable<RecordBatch> ToArrowRecordBatches()
{
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Data.Analysis/DataFrame.IO.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private static DataFrameColumn CreateColumn(Type kind, string columnName)
}
else if (kind == typeof(DateTime))
{
ret = new PrimitiveDataFrameColumn<DateTime>(columnName);
ret = new DateTimeDataFrameColumn(columnName);
}
else
{
Expand Down
12 changes: 0 additions & 12 deletions src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,18 +374,6 @@ internal int MaxRecordBatchLength(long startIndex)
return Buffers[arrayIndex].Length - (int)startIndex;
}

internal ReadOnlyMemory<byte> GetValueBuffer(long startIndex)
{
int arrayIndex = GetArrayContainingRowIndex(startIndex);
return Buffers[arrayIndex].ReadOnlyBuffer;
}

internal ReadOnlyMemory<byte> GetNullBuffer(long startIndex)
{
int arrayIndex = GetArrayContainingRowIndex(startIndex);
return NullBitMapBuffers[arrayIndex].ReadOnlyBuffer;
}

public IReadOnlyList<T?> this[long startIndex, int length]
{
get
Expand Down
57 changes: 44 additions & 13 deletions src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Apache.Arrow;
using Apache.Arrow.Types;
using Microsoft.ML;
Expand Down Expand Up @@ -103,6 +104,8 @@ private IArrowType GetArrowType()
return UInt64Type.Default;
else if (typeof(T) == typeof(ushort))
return UInt16Type.Default;
else if (typeof(T) == typeof(DateTime))
return Date64Type.Default;
else
throw new NotImplementedException(nameof(T));
}
Expand All @@ -126,36 +129,64 @@ protected internal override Apache.Arrow.Array ToArrowArray(long startIndex, int
{
int arrayIndex = numberOfRows == 0 ? 0 : _columnContainer.GetArrayContainingRowIndex(startIndex);
int offset = (int)(startIndex - arrayIndex * ReadOnlyDataFrameBuffer<T>.MaxCapacity);

if (numberOfRows != 0 && numberOfRows > _columnContainer.Buffers[arrayIndex].Length - offset)
{
throw new ArgumentException(Strings.SpansMultipleBuffers, nameof(numberOfRows));
}
ArrowBuffer valueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetValueBuffer(startIndex));
ArrowBuffer nullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetNullBuffer(startIndex));

int nullCount = GetNullCount(startIndex, numberOfRows);

//DateTime requires convertion
if (this.DataType == typeof(DateTime))
{
if (numberOfRows == 0)
return new Date64Array(ArrowBuffer.Empty, ArrowBuffer.Empty, numberOfRows, nullCount, offset);

ReadOnlyDataFrameBuffer<T> valueBuffer = (numberOfRows == 0) ? null : _columnContainer.Buffers[arrayIndex];
ReadOnlyDataFrameBuffer<byte> nullBuffer = (numberOfRows == 0) ? null : _columnContainer.NullBitMapBuffers[arrayIndex];

ReadOnlySpan<DateTime> valueSpan = MemoryMarshal.Cast<T, DateTime>(valueBuffer.ReadOnlySpan);
Date64Array.Builder builder = new Date64Array.Builder().Reserve(valueBuffer.Length);

for (int i = 0; i < valueBuffer.Length; i++)
{
if (BitUtility.GetBit(nullBuffer.ReadOnlySpan, i))
builder.Append(valueSpan[i]);
else
builder.AppendNull();
}

return builder.Build();
}

//No convertion
ArrowBuffer arrowValueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.Buffers[arrayIndex].ReadOnlyBuffer);
ArrowBuffer arrowNullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.NullBitMapBuffers[arrayIndex].ReadOnlyBuffer);

Type type = this.DataType;
if (type == typeof(bool))
return new BooleanArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new BooleanArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(double))
return new DoubleArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new DoubleArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(float))
return new FloatArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new FloatArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(int))
return new Int32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new Int32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(long))
return new Int64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new Int64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(sbyte))
return new Int8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new Int8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(short))
return new Int16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new Int16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(uint))
return new UInt32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new UInt32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(ulong))
return new UInt64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new UInt64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(ushort))
return new UInt16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new UInt16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else if (type == typeof(byte))
return new UInt8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset);
return new UInt8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset);
else
throw new NotImplementedException(type.ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void TestArrowIntegration()
.Append("ULongColumn", false, new UInt64Array.Builder().AppendRange(Enumerable.Repeat((ulong)1, 10)).Build())
.Append("ByteColumn", false, new Int8Array.Builder().AppendRange(Enumerable.Repeat((sbyte)1, 10)).Build())
.Append("UByteColumn", false, new UInt8Array.Builder().AppendRange(Enumerable.Repeat((byte)1, 10)).Build())
.Append("Date64Column", false, new Date64Array.Builder().AppendRange(Enumerable.Repeat(DateTime.Now, 10)).Build())
.Build();

DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch);
Expand Down
2 changes: 1 addition & 1 deletion test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ internal static void VerifyColumnTypes(DataFrame df, bool testArrowStringColumn
}
else if (dataType == typeof(DateTime))
{
Assert.IsType<PrimitiveDataFrameColumn<DateTime>>(column);
Assert.IsType<DateTimeDataFrameColumn>(column);
}
else
{
Expand Down