From 90684dc750ce755170689fc3c4582d74c1f9636b Mon Sep 17 00:00:00 2001 From: Aleksei Smirnov Date: Fri, 12 May 2023 23:11:42 +0300 Subject: [PATCH 1/2] Add missing implementation for datetime relevant arrow type --- eng/Versions.props | 2 +- .../DataFrame.Arrow.cs | 15 ++++- src/Microsoft.Data.Analysis/DataFrame.IO.cs | 2 +- .../PrimitiveColumnContainer.cs | 12 ---- .../PrimitiveDataFrameColumn.cs | 57 ++++++++++++++----- .../ArrowIntegrationTests.cs | 1 + .../DataFrame.IOTests.cs | 3 +- 7 files changed, 60 insertions(+), 32 deletions(-) diff --git a/eng/Versions.props b/eng/Versions.props index 0c6252b6f5..dae39df184 100644 --- a/eng/Versions.props +++ b/eng/Versions.props @@ -30,7 +30,7 @@ 6.0.1 4.7.1 - 2.0.0 + 11.0.0 3.19.6 2.3.1 3.3.0 diff --git a/src/Microsoft.Data.Analysis/DataFrame.Arrow.cs b/src/Microsoft.Data.Analysis/DataFrame.Arrow.cs index dc5be46421..2d67b9ee78 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Arrow.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Arrow.cs @@ -101,10 +101,18 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray AppendDataFrameColumnFromArrowArray(fieldsEnumerator.Current, structArrayEnumerator.Current, ret, field.Name + "_"); } break; - case ArrowTypeId.Decimal: + case ArrowTypeId.Date64: + Date64Array arrowDate64Array = (Date64Array)arrowArray; + dataFrameColumn = new DateTimeDataFrameColumn(fieldName, arrowDate64Array.Data.Length); + for (int i = 0; i < arrowDate64Array.Data.Length; i++) + { + dataFrameColumn[i] = arrowDate64Array.GetDateTime(i); + } + break; + case ArrowTypeId.Decimal128: + case ArrowTypeId.Decimal256: case ArrowTypeId.Binary: case ArrowTypeId.Date32: - case ArrowTypeId.Date64: case ArrowTypeId.Dictionary: case ArrowTypeId.FixedSizedBinary: case ArrowTypeId.HalfFloat: @@ -114,6 +122,7 @@ private static void AppendDataFrameColumnFromArrowArray(Field field, IArrowArray case ArrowTypeId.Null: case ArrowTypeId.Time32: case ArrowTypeId.Time64: + case ArrowTypeId.Timestamp: default: throw new NotImplementedException($"{fieldType.Name}"); } @@ -145,7 +154,7 @@ public static DataFrame FromArrowRecordBatch(RecordBatch recordBatch) } /// - /// Returns an without copying data + /// Returns an mostly without copying data /// public IEnumerable ToArrowRecordBatches() { diff --git a/src/Microsoft.Data.Analysis/DataFrame.IO.cs b/src/Microsoft.Data.Analysis/DataFrame.IO.cs index e525624998..4a57397588 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.IO.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.IO.cs @@ -336,7 +336,7 @@ private static DataFrameColumn CreateColumn(Type kind, string columnName) } else if (kind == typeof(DateTime)) { - ret = new PrimitiveDataFrameColumn(columnName); + ret = new DateTimeDataFrameColumn(columnName); } else { diff --git a/src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs b/src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs index 830440445e..022a6cdfe4 100644 --- a/src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs +++ b/src/Microsoft.Data.Analysis/PrimitiveColumnContainer.cs @@ -374,18 +374,6 @@ internal int MaxRecordBatchLength(long startIndex) return Buffers[arrayIndex].Length - (int)startIndex; } - internal ReadOnlyMemory GetValueBuffer(long startIndex) - { - int arrayIndex = GetArrayContainingRowIndex(startIndex); - return Buffers[arrayIndex].ReadOnlyBuffer; - } - - internal ReadOnlyMemory GetNullBuffer(long startIndex) - { - int arrayIndex = GetArrayContainingRowIndex(startIndex); - return NullBitMapBuffers[arrayIndex].ReadOnlyBuffer; - } - public IReadOnlyList this[long startIndex, int length] { get diff --git a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs index 113b67cc1c..a0073e7a6c 100644 --- a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs @@ -6,6 +6,7 @@ using System.Collections; using System.Collections.Generic; using System.Diagnostics; +using System.Runtime.InteropServices; using Apache.Arrow; using Apache.Arrow.Types; using Microsoft.ML; @@ -103,6 +104,8 @@ private IArrowType GetArrowType() return UInt64Type.Default; else if (typeof(T) == typeof(ushort)) return UInt16Type.Default; + else if (typeof(T) == typeof(DateTime)) + return Date64Type.Default; else throw new NotImplementedException(nameof(T)); } @@ -126,36 +129,64 @@ protected internal override Apache.Arrow.Array ToArrowArray(long startIndex, int { int arrayIndex = numberOfRows == 0 ? 0 : _columnContainer.GetArrayContainingRowIndex(startIndex); int offset = (int)(startIndex - arrayIndex * ReadOnlyDataFrameBuffer.MaxCapacity); + if (numberOfRows != 0 && numberOfRows > _columnContainer.Buffers[arrayIndex].Length - offset) { throw new ArgumentException(Strings.SpansMultipleBuffers, nameof(numberOfRows)); } - ArrowBuffer valueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetValueBuffer(startIndex)); - ArrowBuffer nullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.GetNullBuffer(startIndex)); + int nullCount = GetNullCount(startIndex, numberOfRows); + + //DateTime requires convertion + if (this.DataType == typeof(DateTime)) + { + if (numberOfRows == 0) + return new Date64Array(ArrowBuffer.Empty, ArrowBuffer.Empty, numberOfRows, nullCount, offset); + + ReadOnlyDataFrameBuffer valueBuffer = (numberOfRows == 0) ? null : _columnContainer.Buffers[arrayIndex]; + ReadOnlyDataFrameBuffer nullBuffer = (numberOfRows == 0) ? null : _columnContainer.NullBitMapBuffers[arrayIndex]; + + ReadOnlySpan valueSpan = MemoryMarshal.Cast(valueBuffer.ReadOnlySpan); + Date64Array.Builder builder = new Date64Array.Builder().Reserve(valueBuffer.Length); + + for (int i = 0; i < valueBuffer.Length; i++) + { + if (BitUtility.GetBit(nullBuffer.ReadOnlySpan, i)) + builder.Append(valueSpan[i]); + else + builder.AppendNull(); + } + + return builder.Build(); + } + + //No convertion + ArrowBuffer arrowValueBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.Buffers[arrayIndex].ReadOnlyBuffer); + ArrowBuffer arrowNullBuffer = numberOfRows == 0 ? ArrowBuffer.Empty : new ArrowBuffer(_columnContainer.NullBitMapBuffers[arrayIndex].ReadOnlyBuffer); + Type type = this.DataType; if (type == typeof(bool)) - return new BooleanArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new BooleanArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(double)) - return new DoubleArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new DoubleArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(float)) - return new FloatArray(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new FloatArray(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(int)) - return new Int32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new Int32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(long)) - return new Int64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new Int64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(sbyte)) - return new Int8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new Int8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(short)) - return new Int16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new Int16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(uint)) - return new UInt32Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new UInt32Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(ulong)) - return new UInt64Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new UInt64Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(ushort)) - return new UInt16Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new UInt16Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else if (type == typeof(byte)) - return new UInt8Array(valueBuffer, nullBuffer, numberOfRows, nullCount, offset); + return new UInt8Array(arrowValueBuffer, arrowNullBuffer, numberOfRows, nullCount, offset); else throw new NotImplementedException(type.ToString()); } diff --git a/test/Microsoft.Data.Analysis.Tests/ArrowIntegrationTests.cs b/test/Microsoft.Data.Analysis.Tests/ArrowIntegrationTests.cs index dacf43a8db..185ab835bb 100644 --- a/test/Microsoft.Data.Analysis.Tests/ArrowIntegrationTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/ArrowIntegrationTests.cs @@ -48,6 +48,7 @@ public void TestArrowIntegration() .Append("ULongColumn", false, new UInt64Array.Builder().AppendRange(Enumerable.Repeat((ulong)1, 10)).Build()) .Append("ByteColumn", false, new Int8Array.Builder().AppendRange(Enumerable.Repeat((sbyte)1, 10)).Build()) .Append("UByteColumn", false, new UInt8Array.Builder().AppendRange(Enumerable.Repeat((byte)1, 10)).Build()) + .Append("Date64Column", false, new Date64Array.Builder().AppendRange(Enumerable.Repeat(DateTime.Now, 10)).Build()) .Build(); DataFrame df = DataFrame.FromArrowRecordBatch(originalBatch); diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs index 398e849907..5c33e199d7 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs @@ -13,7 +13,6 @@ using System.Data.SQLite; using System.Data.SQLite.EF6; using Xunit; -using Microsoft.ML.TestFramework.Attributes; namespace Microsoft.Data.Analysis.Tests { @@ -102,7 +101,7 @@ internal static void VerifyColumnTypes(DataFrame df, bool testArrowStringColumn } else if (dataType == typeof(DateTime)) { - Assert.IsType>(column); + Assert.IsType(column); } else { From bba95f553f5cb7b0d79eea6acedc2cc3f57a0aaf Mon Sep 17 00:00:00 2001 From: Aleksei Smirnov Date: Wed, 17 May 2023 23:04:28 +0300 Subject: [PATCH 2/2] Return required usage --- test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs index 5c33e199d7..09e95bcae1 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrame.IOTests.cs @@ -13,6 +13,7 @@ using System.Data.SQLite; using System.Data.SQLite.EF6; using Xunit; +using Microsoft.ML.TestFramework.Attributes; namespace Microsoft.Data.Analysis.Tests {