Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35809: [C#] Improvements to the C Data Interface #35810

Merged
merged 15 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions csharp/src/Apache.Arrow/Apache.Arrow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@
<ItemGroup Condition="!$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework), 'net5.0'))">
<Compile Remove="Arrays\HalfFloatArray.cs" />
</ItemGroup>
<ItemGroup Condition="$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework), 'net5.0'))">
<!-- Code targeting .NET 5+ should use [UnmanagedCallersOnly]. -->
<Compile Remove="C\NativeDelegate.cs" />
</ItemGroup>
</Project>
17 changes: 6 additions & 11 deletions csharp/src/Apache.Arrow/C/CArrowArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public unsafe struct CArrowArray
public byte** buffers;
public CArrowArray** children;
public CArrowArray* dictionary;
public delegate* unmanaged[Stdcall]<CArrowArray*, void> release;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#endif
<CArrowArray*, void> release;
public void* private_data;

/// <summary>
Expand All @@ -51,16 +55,7 @@ public unsafe struct CArrowArray
{
var ptr = (CArrowArray*)Marshal.AllocHGlobal(sizeof(CArrowArray));

ptr->length = 0;
ptr->n_buffers = 0;
ptr->offset = 0;
ptr->buffers = null;
ptr->n_children = 0;
ptr->children = null;
ptr->dictionary = null;
ptr->null_count = 0;
ptr->release = null;
ptr->private_data = null;
*ptr = default;

return ptr;
}
Expand Down
28 changes: 18 additions & 10 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@


using System;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow.Memory;

namespace Apache.Arrow.C
{
public static class CArrowArrayExporter
{
#if NET5_0_OR_GREATER
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment explaining why this needs .Net 5.0+? Or will it be obvious to a C# developer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnmanagedCallersOnlyAttribute was introduced in .NET 5.

private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray;
#else
private unsafe delegate void ReleaseArrowArray(CArrowArray* cArray);
private static unsafe readonly NativeDelegate<ReleaseArrowArray> s_releaseArray = new NativeDelegate<ReleaseArrowArray>(ReleaseArray);

private static unsafe delegate* unmanaged[Cdecl]<CArrowArray*, void> ReleaseArrayPtr => (delegate* unmanaged[Cdecl]<CArrowArray*, void>)s_releaseArray.Pointer;
#endif
/// <summary>
/// Export an <see cref="IArrowArray"/> to a <see cref="CArrowArray"/>. Whether or not the
/// export succeeds, the original array becomes invalid. Clone an array to continue using it
Expand Down Expand Up @@ -54,7 +59,7 @@ public static unsafe void ExportArray(IArrowArray array, CArrowArray* cArray)
try
{
ConvertArray(allocationOwner, array.Data, cArray);
cArray->release = (delegate* unmanaged[Stdcall]<CArrowArray*, void>)(IntPtr)s_releaseArray.Pointer;
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
Expand Down Expand Up @@ -97,7 +102,7 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr
try
{
ConvertRecordBatch(allocationOwner, batch, cArray);
cArray->release = (delegate* unmanaged[Stdcall]<CArrowArray*, void>)s_releaseArray.Pointer;
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
Expand All @@ -112,7 +117,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
cArray->length = array.Length;
cArray->offset = array.Offset;
cArray->null_count = array.NullCount;
cArray->release = (delegate* unmanaged[Stdcall]<CArrowArray*, void>)s_releaseArray.Pointer;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;

cArray->n_buffers = array.Buffers?.Length ?? 0;
Expand Down Expand Up @@ -157,7 +162,7 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
cArray->length = batch.Length;
cArray->offset = 0;
cArray->null_count = 0;
cArray->release = (delegate* unmanaged[Stdcall]<CArrowArray*, void>)s_releaseArray.Pointer;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;

cArray->n_buffers = 1;
Expand All @@ -180,13 +185,12 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
cArray->dictionary = null;
}

#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static void ReleaseArray(CArrowArray* cArray)
{
if (cArray->private_data != null)
{
Dispose(&cArray->private_data);
}
cArray->private_data = null;
Dispose(&cArray->private_data);
cArray->release = null;
}

Expand All @@ -199,6 +203,10 @@ private unsafe static void ReleaseArray(CArrowArray* cArray)
private unsafe static void Dispose(void** ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr));
if (!gch.IsAllocated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively a noop. If the pointer was null, the previous line will throw InvalidOperationException and if it wasn't null then IsAllocated will return true.

The overall change here also means that calling ReleaseArray twice will now throw an exception instead of the second call being a no-op.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining when this might occur? (perhaps if an exception occurs while exporting the array?)

{
return;
}
((IDisposable)gch.Target).Dispose();
gch.Free();
*ptr = null;
Expand Down
2 changes: 1 addition & 1 deletion csharp/src/Apache.Arrow/C/CArrowArrayImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private ArrayData GetAsArrayData(CArrowArray* cArray, IArrowType type)
case ArrowTypeId.Map:
break;
case ArrowTypeId.Null:
buffers = new ArrowBuffer[0];
buffers = System.Array.Empty<ArrowBuffer>();
break;
case ArrowTypeId.Dictionary:
DictionaryType dictionaryType = (DictionaryType)type;
Expand Down
30 changes: 21 additions & 9 deletions csharp/src/Apache.Arrow/C/CArrowArrayStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,23 @@ public unsafe struct CArrowArrayStream
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
///</summary>
public delegate* unmanaged[Stdcall]<CArrowArrayStream*, CArrowSchema*, int> get_schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make these function pointers internal, how are users of these APIs supposed to call them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should not; the importer will take care of calling them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow. If I wanted to take some C arrow data (let's say coming from C++) and natively work with it - not using the "normal" C# ArrowTypes, Schema, RecordBatches, etc. but instead manually calling into the native functions on the CArrowArrayStream, that is no longer possible? Why not?

The reason for interoping at the native layer would be for performance - I wouldn't need to allocate a bunch of managed objects just to interact with the Arrow information coming from some other library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #35810 (comment) for an explanation on why the function pointer fields are internal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to only publicly expose the function pointer fields on net5.0+. Keep them internal for netstandard and netfx where they need to be declared Cdecl (and honestly function pointers aren't really meant to be used on netfx and netstandard since they came in C# 9 - https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/configure-language-version).

That way users on net5+ can still call the function pointers, if they need to. For netstandard and netfx, they are no worse off - they can't call them with this change anyway.

internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#endif
<CArrowArrayStream*, CArrowSchema*, int> get_schema;

/// <summary>
/// Callback to get the next array. If no error and the array is released, the stream has ended.
/// If successful, the ArrowArray must be released independently from the stream.
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
/// </summary>
public delegate* unmanaged[Stdcall]<CArrowArrayStream*, CArrowArray*, int> get_next;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#endif
<CArrowArrayStream*, CArrowArray*, int> get_next;

/// <summary>
/// Callback to get optional detailed error information. This must only
Expand All @@ -54,13 +62,21 @@ public unsafe struct CArrowArrayStream
/// Return value: pointer to a null-terminated character array describing the last
/// error, or NULL if no description is available.
///</summary>
public delegate* unmanaged[Stdcall]<CArrowArrayStream*, byte*> get_last_error;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#endif
<CArrowArrayStream*, byte*> get_last_error;

/// <summary>
/// Release callback: release the stream's own resources. Note that arrays returned by
/// get_next must be individually released.
/// </summary>
public delegate* unmanaged[Stdcall]<CArrowArrayStream*, void> release;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#endif
<CArrowArrayStream*, void> release;

public void* private_data;

Expand All @@ -74,11 +90,7 @@ public unsafe struct CArrowArrayStream
{
var ptr = (CArrowArrayStream*)Marshal.AllocHGlobal(sizeof(CArrowArrayStream));

ptr->get_schema = null;
ptr->get_next = null;
ptr->get_last_error = null;
ptr->release = null;
ptr->private_data = null;
*ptr = default;

return ptr;
}
Expand Down
53 changes: 46 additions & 7 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,37 @@


using System;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow.Ipc;

namespace Apache.Arrow.C
{
public static class CArrowArrayStreamExporter
{
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArrayStream*, CArrowSchema*, int> GetSchemaPtr => &GetSchema;
private static unsafe delegate* unmanaged<CArrowArrayStream*, CArrowArray*, int> GetNextPtr => &GetNext;
private static unsafe delegate* unmanaged<CArrowArrayStream*, byte*> GetLastErrorPtr => &GetLastError;
private static unsafe delegate* unmanaged<CArrowArrayStream*, void> ReleasePtr => &Release;
#else
private unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
private static unsafe NativeDelegate<GetSchemaArrayStream> s_getSchemaArrayStream = new NativeDelegate<GetSchemaArrayStream>(GetSchema);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int> GetSchemaPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int>)s_getSchemaArrayStream.Pointer;
private unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe NativeDelegate<GetNextArrayStream> s_getNextArrayStream = new NativeDelegate<GetNextArrayStream>(GetNext);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int> GetNextPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int>)s_getNextArrayStream.Pointer;
private unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<GetLastErrorArrayStream> s_getLastErrorArrayStream = new NativeDelegate<GetLastErrorArrayStream>(GetLastError);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*> GetLastErrorPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*>)s_getLastErrorArrayStream.Pointer;
private unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<ReleaseArrayStream> s_releaseArrayStream = new NativeDelegate<ReleaseArrayStream>(Release);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, void> ReleasePtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, void>)s_releaseArrayStream.Pointer;
#endif

/// <summary>
/// Export an <see cref="IArrowArrayStream"/> to a <see cref="CArrowArrayStream"/>.
Expand All @@ -55,12 +71,15 @@ public static unsafe void ExportArrayStream(IArrowArrayStream arrayStream, CArro
}

cArrayStream->private_data = ExportedArrayStream.Export(arrayStream);
cArrayStream->get_schema = (delegate* unmanaged[Stdcall]<CArrowArrayStream*, CArrowSchema*, int>)s_getSchemaArrayStream.Pointer;
cArrayStream->get_next = (delegate* unmanaged[Stdcall]<CArrowArrayStream*, CArrowArray*, int>)s_getNextArrayStream.Pointer;
cArrayStream->get_last_error = (delegate* unmanaged[Stdcall]<CArrowArrayStream*, byte*>)s_getLastErrorArrayStream.Pointer;
cArrayStream->release = (delegate* unmanaged[Stdcall]<CArrowArrayStream*, void>)s_releaseArrayStream.Pointer;
cArrayStream->get_schema = GetSchemaPtr;
cArrayStream->get_next = GetNextPtr;
cArrayStream->get_last_error = GetLastErrorPtr;
cArrayStream->release = ReleasePtr;
}

#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static int GetSchema(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema)
{
ExportedArrayStream arrayStream = null;
Expand All @@ -76,6 +95,9 @@ private unsafe static int GetSchema(CArrowArrayStream* cArrayStream, CArrowSchem
}
}

#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray* cArray)
{
ExportedArrayStream arrayStream = null;
Expand All @@ -96,6 +118,9 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
}
}

#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static byte* GetLastError(CArrowArrayStream* cArrayStream)
{
try
Expand All @@ -109,10 +134,12 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
}
}

#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static void Release(CArrowArrayStream* cArrayStream)
{
ExportedArrayStream arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
arrayStream.Dispose();
ExportedArrayStream.Free(&cArrayStream->private_data);
cArrayStream->release = null;
}

Expand All @@ -136,6 +163,18 @@ sealed unsafe class ExportedArrayStream : IDisposable
return (void*)GCHandle.ToIntPtr(gch);
}

public static void Free(void** ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)ptr);
if (!gch.IsAllocated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same comment about IsAllocated.)

{
return;
}
((ExportedArrayStream)gch.Target).Dispose();
gch.Free();
*ptr = null;
}

public static ExportedArrayStream FromPointer(void* ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)ptr);
Expand Down Expand Up @@ -166,7 +205,7 @@ void ReleaseLastError()
{
if (LastError != null)
{
Marshal.FreeCoTaskMem((IntPtr)LastError);
Marshal.FreeHGlobal((IntPtr)LastError);
LastError = null;
}
}
Expand Down
23 changes: 19 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
Expand Down Expand Up @@ -55,6 +55,16 @@ private sealed unsafe class ImportedArrowArrayStream : IArrowArrayStream
private readonly Schema _schema;
private bool _disposed;

internal static string GetLastError(CArrowArrayStream* arrayStream, int errno)
{
byte* error = arrayStream->get_last_error(arrayStream);
if (error == null)
{
return $"Array stream operation failed with no message. Error code: {errno}";
}
return StringUtil.PtrToStringUtf8(error);
}

public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
{
if (cArrayStream == null)
Expand All @@ -70,7 +80,7 @@ public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
int errno = cArrayStream->get_schema(cArrayStream, &cSchema);
if (errno != 0)
{
throw new Exception($"Unexpected error recieved from external stream. Errno: {errno}");
throw new Exception(GetLastError(cArrayStream, errno));
}
_schema = CArrowSchemaImporter.ImportSchema(&cSchema);

Expand All @@ -92,14 +102,19 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell
throw new ObjectDisposedException(typeof(ImportedArrowArrayStream).Name);
}

if (cancellationToken.IsCancellationRequested)
{
return new(Task.FromCanceled<RecordBatch>(cancellationToken));
}

RecordBatch result = null;
CArrowArray cArray = new CArrowArray();
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
int errno = cArrayStream->get_next(cArrayStream, &cArray);
if (errno != 0)
{
throw new Exception($"Unexpected error recieved from external stream. Errno: {errno}");
return new(Task.FromException<RecordBatch>(new Exception(GetLastError(cArrayStream, errno))));
}
if (cArray.release != null)
{
Expand All @@ -115,7 +130,7 @@ public void Dispose()
if (!_disposed && _cArrayStream.release != null)
{
_disposed = true;
fixed (CArrowArrayStream * cArrayStream = &_cArrayStream)
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
cArrayStream->release(cArrayStream);
}
Expand Down
Loading
Loading