-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-34971: [Format] Add non-CPU version of C Data Interface #34972
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few questions about how ArrowDeviceArrayStream
works
cpp/src/arrow/c/abi.h
Outdated
/// The next call to `get_next` should provide an ArrowDeviceArray whose | ||
/// device_id matches what is provided here, and whose device_type is the | ||
/// same as the device_type member of this stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not certain I follow. Isn't the ArrowDeviceArray
passed to get_next
an "out" parameter? Are you saying that the ArrowDeviceArray
struct itself (not the buffers) needs to be allocated on the device?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I was referring to the device_id
member and device_type
member that get populated in the ArrowDeviceArray
that is returned from get_next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird API choice, all because you want the consumer to pass its CUDA stream of choice...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultimately this is a consequence of the fact that the existing frameworks and APIs don't provide any good way to manage the stream's lifetime easily which makes having the consumer pass the stream be the safest route to take.
I'm absolutely open to suggestions to make this better as long as the consumer is able to pass in the desired stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing frameworks and APIs don't provide any good way to manage the stream's lifetime easily
What do you mean by that? Would you care to give a more concrete example? For example CUDA allows you to destroy a stream:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html#group__CUDART__STREAM_1gfda584f1788ca983cb21c5f4d2033a62
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of discussion in that issue related to internal stream handling under contexts and schedulers and similar terms. It all boils down to the same discussion of many producers being unable to release or share ownership of their streams.
I think this comment does a good job of summarizing the options that were considered: dmlc/dlpack#57 (comment)
And then this comment summarizes discussion of those options: dmlc/dlpack#57 (comment)
The lifetime management of streams as defined by the Numba documentation for __cuda_array_interface__
(https://numba.readthedocs.io/en/stable/cuda/cuda_array_interface.html#streams) requires that keeping the object (typically array) that produces the __cuda_array_interface__
alive also keeps the stream alive. In most cases libraries don't associate a stream with the object since it's valid to use multiple streams with a single object.
Here's the current state of things across a handful of projects:
- Numba
- Handles stream lifetime properly for
__cuda_array_interface__
since they store a stream object as part of their array: https://github.com/numba/numba/blob/008077553b558bd183668ecd581d4d0bc54bd32c/numba/cuda/cudadrv/devicearray.py#L119-L139 - Doesn't support
__dlpack__
- Handles stream lifetime properly for
- CuPy
- Doesn't implement stream lifetime properly for
__cuda_array_interface__
as far as I can tell, where they just get a stream ptr as an integer and if it's not the default stream someone could change the current stream ptr and end up having it be destroyed out from underneath it: https://github.com/cupy/cupy/blob/c92d5bc16293300297b843b4ebb364125697c131/cupy/_core/core.pyx#L258-L262 (cc @leofang) - Handles
__dlpack__
properly: https://github.com/cupy/cupy/blob/c92d5bc16293300297b843b4ebb364125697c131/cupy/_core/core.pyx#L285-L327
- Doesn't implement stream lifetime properly for
- PyTorch
- Doesn't support passing the stream in
__cuda_array_interface__
currently: https://github.com/pytorch/pytorch/blob/def50d253401540cfdc6c0fffa444d0ee643cc11/torch/_tensor.py#L1005-L1066 - Handles
__dlpack__
properly: https://github.com/pytorch/pytorch/blob/def50d253401540cfdc6c0fffa444d0ee643cc11/torch/_tensor.py#L1345
- Doesn't support passing the stream in
- Tensorflow and JAX
- Doesn't support passing the stream in
__cuda_array_interface__
currently: https://github.com/tensorflow/tensorflow/blob/ea6a0f282d2b7ce20891dfc24ec8fe107eeaf22d/tensorflow/compiler/xla/python/py_buffer.cc#L254-L300 - Doesn't support
__dlpack__
, but has explicitto
andfrom
functions for using dlpack pycapsules but does not handle streams: https://github.com/tensorflow/tensorflow/blob/6a050b6c15ed2a545693bc171f5e95dacbe05839/tensorflow/compiler/xla/python/dlpack.cc#L285-L363
- Doesn't support passing the stream in
- cuDF
- Doesn't support passing the stream in
__cuda_array_interface__
currently: https://github.com/rapidsai/cudf/blob/50718e673ff53b18706cf66c6e02cda8e30681fe/python/cudf/cudf/core/column/numerical.py#L169-L191 - Doesn't support
__dlpack__
but has explicitto
andfrom
functions for using dlpack pycapsules but does not handle streams: https://github.com/rapidsai/cudf/blob/50718e673ff53b18706cf66c6e02cda8e30681fe/cpp/src/interop/dlpack.cpp#L218-L294
- Doesn't support passing the stream in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks the pointers.
I think this comment does a good job of summarizing the options that were considered: dmlc/dlpack#57 (comment)
Yes, I read this. It looks like solution S1, which is also the one I'm proposing, is considered the most flexible (I don't understand the "harder for compilers" comment, though).
And then this comment summarizes discussion of those options: dmlc/dlpack#57 (comment)
I read this too, but it doesn't actually mention S1, for reasons I wasn't able to understand.
In most cases libraries don't associate a stream with the object since it's valid to use multiple streams with a single object.
But you have to actually synchronize on the right stream before being able to use the object, right? How does the user know which stream to synchronize on, if they didn't produce the data themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I read this. It looks like solution S1, which is also the one I'm proposing, is considered the most flexible (I don't understand the "harder for compilers" comment, though).
From: dmlc/dlpack#57 (comment)
It also brings extra burden to the compilers themselves. The compiler will need to generate optional synchronization code based on the streams, which is non-trivial.
I believe the compilers being referred to here are deep learning compilers like XLA which do things like kernel fusion and set up execution graphs of kernels that use streams internally to parallelize the execution of said graphs.
But you have to actually synchronize on the right stream before being able to use the object, right?
Something / someone needs to guarantee that there isn't a data race with regards to using multiple non-blocking streams, yes. That could be done with events, stream synchronization, or device synchronization.
How does the user know which stream to synchronize on, if they didn't produce the data themselves?
If you're staying within your framework / library then the expectation is for the framework / library to handle things for the user. If crossing framework / library boundaries, then the expectation is to be reliant on things like interchange protocols to handle the synchronization semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you happen to know if there was any other discussion captured that could be linked here regarding the decision to have a consumer hand a stream to the producer
Sorry I wasn't able to respond promptly. Is the question still open?
In the case of CAI, it is required that someone handles the exporting stream's lifetime properly:
Like data, CUDA streams also have a finite lifetime. It is therefore required that a Producer exporting data on the interface with an associated stream ensures that the exported stream’s lifetime is equal to or surpasses the lifetime of the object from which the interface was exported.
and this was considered a burden when discussing the DLPack support. A few libraries like Numba, for example, had to hold the reference to the underlying stream. I believe this was the main concern for DLPack to place the requirement on the consumer instead of the producer.
cpp/src/arrow/c/abi.h
Outdated
/// \param[in] queue_ptr The appropriate queue, stream, or | ||
/// equivalent object for the device that the data is allocated on | ||
/// to indicate where the consumer wants the data to be accessible. | ||
/// If queue_ptr is NULL then the default stream (e.g. CUDA stream 0) | ||
/// should be used to ensure that the memory is accessible from any stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused here. It sounds like I need to call get_next_device_id
to determine which queue to use and then I need to pass that queue on to the call to get_next
. But why? Why isn't the producer managing the queues?
If the producer controls which device id gets used (get_next_device_id
seems to suggest this) then why does the consumer need to give it the queue? For example, if I were merging streams from two different devices it seems like I would do something like (apologies for the butchered pseudo-code)...
// Dummy class merging two infinite streams in an inefficient round-robin fashion
class MergedStream {
int get_next(ArrowDeviceArray* out) {
if (merged_arrays_.empty()) {
ArrowDeviceArray arr;
left_.get_next(&arr);
merged_arrays_.push(arr);
right_.get_next(&arr);
merged_arrays_.push(arr);
}
*out = merged_arrays_.pop();
}
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace The idea here is that the consumer of the interface provides a queue to the producer and the producer is responsible for ensuring that the data is safe to consume on the provided queue.
The reason for doing this instead of the producer returning a pointer to a queue that the data is safe to consume on is that frameworks generally manage these queues internally and don't have a mechanism to share a queue and control its lifetime over a boundary like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The standard "mechanism to share a queue and control its lifetime over a boundary like this" in the C Data Interface would be the release callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't make sense, does it? How is the consumer supposed to manage the stream's lifetime if "there isn't a mechanism that you could call in the release callback to cleanly control the lifetime"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't they? They can easily refcount the usage of their own CUDA streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why wouldn't they? They can easily refcount the usage of their own CUDA streams.
I think that is making a lot of assumptions about how folks use and manage CUDA streams 😄. Again, some places use them similarly to thread pools and only control the lifetime of the pool.
I tried to dig through Tensorflow's code to figure exactly how they're managing the lifetime of their streams but I'm not confident, everything I say below may not be correct:
- Something eventually calls down to
AllocateStream
andDeallocateStream
(https://github.com/tensorflow/tensorflow/blob/b9fc6a9b611ec373c02e5b5ab432b1d7aff9392e/tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc#L759-L774) to create and destroy CUDA streams. - These operate on raw ptrs and it looks like there's a class that wraps these,
Stream
(https://github.com/tensorflow/tensorflow/blob/b9fc6a9b611ec373c02e5b5ab432b1d7aff9392e/tensorflow/compiler/xla/stream_executor/stream.cc#L262-L286) which has constructor, destructor, and init functions roughly of what you'd expect. - I believe these
Stream
objects are managed in aStreamPool
(https://github.com/tensorflow/tensorflow/blob/b9fc6a9b611ec373c02e5b5ab432b1d7aff9392e/tensorflow/compiler/xla/service/stream_pool.h#L27-L59) which then allows "borrowing" the streams using unique ptrs.
I guess in theory that if they ultimately have Stream
objects being used that it could be moved into the private data being used by the release callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to dig through Tensorflow's code to figure exactly how they're managing the lifetime of their streams but I'm not confident
The fact that they're handling those lifetimes should be enough to plug a refcounting mechanism (or interact with the GC, in case of a managed language). This is already necessary to manage the lifetime of data exported through the C Data Interface.
I understand that they might not have a refcounting mechanism in place already, but that's basic engineering anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, regardless if we take the producer provided path then I think it makes a lot more sense for the producer to share an Event than a Stream.
An Event can be waited on via cudaStreamWaitEvent
/ hipStreamWaitEvent
which does a device side wait which would have minimal overhead if it's the same stream or cudaEventSynchronize
/ hipEventSynchronize
if blocking host code is desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it seems we're going to take the Producer providing an event path, there isn't really a need for the get_next_device_id
callback anymore, correct? Or am I missing something?
@pitrou I think the other thing that hasn't been discussed in the above threads with regards to producer vs consumer provided stream is the development burden that will have to be incurred by the consumer in the different situations. In producer provided stream / event:
In consumer provided stream:
|
Ok, who is supposed to be the consumer of the C Device Data Interface? I would expect it to be an Arrow implementation, or an Arrow-compatible library. Realistically they probably already deal with CUDA streams if they support CUDA? |
(also, to make things clear, I am not saying this proposal is wrong; I just want to make sure we evaluate the issues carefully and accurately - hence the questions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments, but LGTM overall.
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Benchmark runs are scheduled for baseline = 9fb8697 and contender = 105b9df. 105b9df is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
After: - https://github.com/zeroshade/arrow-non-cpu/tree/main - https://lists.apache.org/thread/o2hsw7o1gm3qgw5z51rmz6zqxh0p7bvk - apache/arrow#34972 Still in very much draft form; however, it *does* implement arbitrary ArrowArray copy to/from `ARROW_DEVICE_METAL`, `ARROW_DEVICE_CUDA`, `ARROW_DEVICE_CUDA_HOST`, and `ARROW_DEVICE_CPU`. The nanoarrow_device extension as drafted here serves a similar purpose to nanoarrow: a means by which to create and consume the C ABI with the intention of shipping those structures to other libraries to do transformations, and potentially retrieving them again after the computation is complete. Perhaps another way to put it is that nanoarrow is designed to help at the edges: it can create and consume. Similarly, the nanoarrow_device extension is designed to help at the edges: it can copy/move arrays to and from CPU-land. With this PR, you can currently do something like: ```c struct ArrowDevice* gpu = ArrowDeviceMetalDefaultDevice(); // Alternatively, ArrowDeviceCuda(ARROW_DEVICE_CUDA, 0) // or ArrowDeviceCuda(ARROW_DEVICE_CUDA_HOST, 0) struct ArrowDevice* cpu = ArrowDeviceCpu(); struct ArrowArray array; struct ArrowDeviceArray device_array; struct ArrowDeviceArrayView device_array_view; // Build a CPU array ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_STRING), NANOARROW_OK); ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK); ASSERT_EQ(ArrowArrayAppendString(&array, ArrowCharView("abc")), NANOARROW_OK); ASSERT_EQ(ArrowArrayAppendString(&array, ArrowCharView("defg")), NANOARROW_OK); ASSERT_EQ(ArrowArrayAppendNull(&array, 1), NANOARROW_OK); ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array, nullptr), NANOARROW_OK); // Convert to a DeviceArray, still on the CPU ArrowDeviceArrayInit(&device_array, cpu); ArrowArrayMove(&array, &device_array.array); // Parse contents into a view that can be copied to another device ArrowDeviceArrayViewInit(&device_array_view); ArrowArrayViewInitFromType(&device_array_view.array_view, string_type); ASSERT_EQ(ArrowDeviceArrayViewSetArray(&device_array_view, &device_array, nullptr), NANOARROW_OK); // Try to zero-copy move to another device or copy if that is not possible. Zero-copy move // is implemented for ARROW_DEVICE_METAL and ARROW_DEVICE_CUDA_HOST for the // gpu -> cpu case. struct ArrowDeviceArray device_array2; device_array2.array.release = nullptr; ASSERT_EQ( ArrowDeviceArrayTryMove(&device_array, &device_array_view, gpu, &device_array2), NANOARROW_OK); ``` In concrete terms, that means we to know enough about a device to (1) copy and/or move an arbitrary `ArrowArray`/`ArrowSchema` pair to a device from the CPU and (2) copy/move an arbitrary `ArrowDeviceArray`/`ArrowSchema` pair back to the CPU. The three types of copying I support (and maybe there could be fewer/need to be more) are: - `ArrowDeviceBufferInit()`: Make a non-owning buffer into an owning buffer on a device. The entry point if you want to take a slice of an `ArrowArrayView` and ship it to a device. - `ArrowDeviceBufferMove()`: Move an existing (owning) buffer to a device. For devices like the CPU, this is a true zero-copy move; for shared memory this can also sometimes be zero copy (e.g., Apple Metal -> CPU) but might also involve a copy. - `ArrowDeviceBufferCopy()`: Copy a section of a buffer into a preallocated section of another buffer. I'm envisioning this to be necessary when copying a String, Binary, List...we need the first and last values of the offsets buffer in order to know what portion of the data buffer to copy. It seems unnecessary to copy 4 bytes of a buffer into an owning variant covered by the first bullet but 🤷 . This PR currently provides support for the CPU device, Apple Metal, CUDA, and CUDA_HOST (i.e., CPU memory that has been registered with CUDA which CUDA copies under the hood). --------- Co-authored-by: Keith Kraus <keith.j.kraus@gmail.com>
… Data support (#40708) ### Rationale for this change We defined a protocol exposing the C Data Interface (schema, array and stream) in Python through PyCapsule objects and dunder methods `__arrow_c_schema/array/stream__` (#35531 / #37797). We also expanded the C Data Interface with device capabilities: https://arrow.apache.org/docs/dev/format/CDeviceDataInterface.html (#34972). This expands the Python exposure of the interface with support for the newer Device structs. ### What changes are included in this PR? Update the specification to defined two additional dunders: * `__arrow_c_device_array__` returns a pair of PyCapsules containing a C ArrowSchema and ArrowDeviceArray, where the latter uses "arrow_device_array" for the capsule name * `__arrow_c_device_stream__` returns a PyCapsule containing a C ArrowDeviceArrayStream, where the capsule must have a name of "arrow_device_array_stream" ### Are these changes tested? Spec-only change * GitHub Issue: #38325 Lead-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Co-authored-by: Dewey Dunnington <dewey@dunnington.ca> Co-authored-by: Antoine Pitrou <pitrou@free.fr> Co-authored-by: Matt Topol <zotthewizard@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
…Device Data support (apache#40708) ### Rationale for this change We defined a protocol exposing the C Data Interface (schema, array and stream) in Python through PyCapsule objects and dunder methods `__arrow_c_schema/array/stream__` (apache#35531 / apache#37797). We also expanded the C Data Interface with device capabilities: https://arrow.apache.org/docs/dev/format/CDeviceDataInterface.html (apache#34972). This expands the Python exposure of the interface with support for the newer Device structs. ### What changes are included in this PR? Update the specification to defined two additional dunders: * `__arrow_c_device_array__` returns a pair of PyCapsules containing a C ArrowSchema and ArrowDeviceArray, where the latter uses "arrow_device_array" for the capsule name * `__arrow_c_device_stream__` returns a PyCapsule containing a C ArrowDeviceArrayStream, where the capsule must have a name of "arrow_device_array_stream" ### Are these changes tested? Spec-only change * GitHub Issue: apache#38325 Lead-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Co-authored-by: Dewey Dunnington <dewey@dunnington.ca> Co-authored-by: Antoine Pitrou <pitrou@free.fr> Co-authored-by: Matt Topol <zotthewizard@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Rationale for this change
In order to support non-cpu devices and memory usage, we can add new
ArrowDeviceArray
andArrowDeviceArrayStream
structs to the C Data Interface in order to allow for handling these types of memory.What changes are included in this PR?
Definitions for a new
ArrowDeviceArray
,ArrowDeviceArrayStream
andArrowDeviceType
enums.