From a403400b77745312a667f389918b8062fef743e8 Mon Sep 17 00:00:00 2001 From: Pearu Peterson Date: Tue, 9 Oct 2018 10:25:10 -0400 Subject: [PATCH] pyarrow and numba CUDA interop Change-Id: I6057fdc68720c5d1c215ed16eb6aaedf0b67818e --- cpp/src/arrow/gpu/cuda_context.cc | 25 ++++ cpp/src/arrow/gpu/cuda_context.h | 9 ++ python/pyarrow/_cuda.pyx | 71 +++++++++- python/pyarrow/includes/libarrow_cuda.pxd | 4 + .../pyarrow/tests/test_cuda_numba_interop.py | 126 ++++++++++++++++++ 5 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 python/pyarrow/tests/test_cuda_numba_interop.py diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 578c04a5a476c..4e118d6c023dd 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -44,11 +44,20 @@ class CudaContext::CudaContextImpl { Status Init(const CudaDevice& device) { device_ = device; + own_context_ = true; CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle)); is_open_ = true; return Status::OK(); } + Status InitShared(const CudaDevice& device, CUcontext ctx) { + device_ = device; + own_context_ = false; + context_ = ctx; + is_open_ = true; + return Status::OK(); + } + Status Close() { if (is_open_ && own_context_) { CU_RETURN_NOT_OK(cuCtxDestroy(context_)); @@ -110,6 +119,8 @@ class CudaContext::CudaContextImpl { const CudaDevice device() const { return device_; } + const void* context_handle() const { return (void*)context_; } + private: CudaDevice device_; CUcontext context_; @@ -165,6 +176,12 @@ class CudaDeviceManager::CudaDeviceManagerImpl { return (*out)->impl_->Init(devices_[device_number]); } + Status CreateSharedContext(int device_number, CUcontext ctx, std::shared_ptr* out) { + // TODO: check if context exists already, if so, return it. + *out = std::shared_ptr(new CudaContext()); + return (*out)->impl_->InitShared(devices_[device_number], ctx); + } + Status GetContext(int device_number, std::shared_ptr* out) { auto it = contexts_.find(device_number); if (it == contexts_.end()) { @@ -212,6 +229,12 @@ Status CudaDeviceManager::CreateNewContext(int device_number, return impl_->CreateNewContext(device_number, out); } +Status CudaDeviceManager::CreateSharedContext(int device_number, + void* ctx, + std::shared_ptr* out) { + return impl_->CreateSharedContext(device_number, (CUcontext)ctx, out); +} + Status CudaDeviceManager::AllocateHost(int64_t nbytes, std::shared_ptr* out) { uint8_t* data = nullptr; @@ -276,5 +299,7 @@ Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, int64_t CudaContext::bytes_allocated() const { return impl_->bytes_allocated(); } +const void* CudaContext::handle() const { return impl_->context_handle(); } + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 50ea94c9b8a66..e140776a6e46e 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -44,6 +44,12 @@ class ARROW_EXPORT CudaDeviceManager { /// In general code will use GetContext Status CreateNewContext(int gpu_number, std::shared_ptr* ctx); + /// \brief Create shared context for a given device number + /// \param[in] device_number + /// \param[in] handle CUDA context handler created by another library + /// \param[out] out shared context + Status CreateSharedContext(int device_number, void* handle, std::shared_ptr* out); + Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); Status FreeHost(void* data, int64_t nbytes); @@ -85,6 +91,9 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_thishandle, + &self.context)) self.device_number = device_number + @staticmethod + def from_numba(context=None): + """Create Context instance from a numba CUDA context. + + Parameters + ---------- + context : {numba.cuda.cudadrv.driver.Context, None} + Specify numba CUDA context instance. When None, use the + current numba context. + + Returns + ------- + shared_context : pyarrow.cuda.Context + Context instance. + """ + if context is None: + import numba.cuda + context = numba.cuda.cudadrv.devices.get_context() + return Context(device_number=context.device.id, + handle=context.handle.value) + + def to_numba(self): + """Convert Context to numba CUDA context. + + Returns + ------- + context : numba.cuda.cudadrv.driver.Context + Numba CUDA context instance. + """ + import ctypes + import numba.cuda + device = numba.cuda.gpus[self.device_number] + handle = ctypes.c_void_p(self.handle) + context = numba.cuda.cudadrv.driver.Context(device, handle) + + class DummyPendingDeallocs(object): + # Context is management by pyarrow + def add_item(self, *args, **kwargs): + pass + + context.deallocations = DummyPendingDeallocs() + return context + @staticmethod def get_num_devices(): """ Return the number of GPU devices. @@ -60,6 +110,12 @@ cdef class Context: """ return self.device_number + @property + def handle(self): + """ Return pointer to context handle. + """ + return self.context.get().handle() + cdef void init(self, const shared_ptr[CCudaContext]& ctx): self.context = ctx @@ -232,6 +288,15 @@ cdef class CudaBuffer(Buffer): check_status(CCudaBuffer.FromBuffer(buf_, &cbuf)) return pyarrow_wrap_cudabuffer(cbuf) + def to_numba(self): + """Return numba memory pointer of CudaBuffer instance. + """ + import ctypes + from numba.cuda.cudadrv.driver import MemoryPointer + return MemoryPointer(self.context.to_numba(), + pointer=ctypes.c_void_p(self.address), + size=self.size) + cdef getitem(self, int64_t i): return self.copy_to_host(position=i, nbytes=1)[0] diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd index 913cf70a927a1..7ec4b2bbe5925 100644 --- a/python/pyarrow/includes/libarrow_cuda.pxd +++ b/python/pyarrow/includes/libarrow_cuda.pxd @@ -27,6 +27,9 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil: CStatus GetContext(int gpu_number, shared_ptr[CCudaContext]* ctx) # CStatus CreateNewContext(int gpu_number, # shared_ptr[CCudaContext]* ctx) + CStatus CreateSharedContext(int gpu_number, + void* handle, + shared_ptr[CCudaContext]* ctx) CStatus AllocateHost(int64_t nbytes, shared_ptr[CCudaHostBuffer]* buffer) # CStatus FreeHost(void* data, int64_t nbytes) @@ -39,6 +42,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::gpu" nogil: CStatus OpenIpcBuffer(const CCudaIpcMemHandle& ipc_handle, shared_ptr[CCudaBuffer]* buffer) int64_t bytes_allocated() const + const void* handle() const cdef cppclass CCudaIpcMemHandle" arrow::gpu::CudaIpcMemHandle": @staticmethod diff --git a/python/pyarrow/tests/test_cuda_numba_interop.py b/python/pyarrow/tests/test_cuda_numba_interop.py new file mode 100644 index 0000000000000..162b810e41272 --- /dev/null +++ b/python/pyarrow/tests/test_cuda_numba_interop.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import pyarrow as pa +import numpy as np + +cuda = pytest.importorskip("pyarrow.cuda") +nb_cuda = pytest.importorskip("numba.cuda") + + +def make_random_buffer(size, target='host', ctx=None): + """Return a host or device buffer with random data. + """ + if target == 'host': + assert size >= 0 + buf = pa.allocate_buffer(size) + arr = np.frombuffer(buf, dtype=np.uint8) + arr[:] = np.random.randint(low=0, high=255, size=size, dtype=np.uint8) + return arr, buf + elif target == 'device': + arr, buf = make_random_buffer(size, target='host') + dbuf = ctx.new_buffer(size) + dbuf.copy_from_host(buf, position=0, nbytes=size) + return arr, dbuf + raise ValueError('invalid target value') + + +def test_numba_memalloc(): + from numba.cuda.cudadrv.devicearray import DeviceNDArray + + # Create context instances + ctx = cuda.Context() + nb_ctx = ctx.to_numba() + assert ctx.handle == nb_ctx.handle.value + assert ctx.handle == nb_cuda.cudadrv.driver.driver.get_context().value + + # Dummy data + size = 10 + arr, buf = make_random_buffer(size, target='host') + + # Allocate memory using numba context + # Warning: this will not be reflected in pyarrow context manager + # (e.g bytes_allocated does not change) + mem = nb_ctx.memalloc(size) + darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem) + darr[:5] = 99 + darr[5:] = 88 + np.testing.assert_equal(darr.copy_to_host()[:5], 99) + np.testing.assert_equal(darr.copy_to_host()[5:], 88) + + # TODO: access the memory via CudaBuffer + address = mem.device_pointer.value + size = mem.size + print('address,size=', address, size) + + mem.free() + + +def test_pyarrow_memalloc(): + from numba.cuda.cudadrv.devicearray import DeviceNDArray + + ctx = cuda.Context() + + size = 10 + arr, cbuf = make_random_buffer(size, target='device', ctx=ctx) + + # wrap CudaBuffer with numba device array + mem = cbuf.to_numba() + darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem) + np.testing.assert_equal(darr.copy_to_host(), arr) + + +def test_numba_context(): + from numba.cuda.cudadrv.devicearray import DeviceNDArray + + size = 10 + with nb_cuda.gpus[0]: + # context is managed by numba + nb_ctx = nb_cuda.cudadrv.devices.get_context() + ctx = cuda.Context.from_numba(nb_ctx) + arr, cbuf = make_random_buffer(size, target='device', ctx=ctx) + assert cbuf.context.handle == nb_ctx.handle.value + mem = cbuf.to_numba() + darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem) + np.testing.assert_equal(darr.copy_to_host(), arr) + darr[0] = 99 + arr2 = np.frombuffer(cbuf.copy_to_host(), dtype=np.uint8) + assert arr2[0] == 99 + + +def test_pyarrow_jit(): + from numba.cuda.cudadrv.devicearray import DeviceNDArray + + # applying numba.cuda kernel to memory hold by CudaBuffer + ctx = cuda.Context() + size = 10 + arr, cbuf = make_random_buffer(size, target='device', ctx=ctx) + + @nb_cuda.jit + def increment_by_one(an_array): + pos = nb_cuda.grid(1) + if pos < an_array.size: + an_array[pos] += 1 + threadsperblock = 32 + blockspergrid = (arr.size + (threadsperblock - 1)) // threadsperblock + mem = cbuf.to_numba() + darr = DeviceNDArray(arr.shape, arr.strides, arr.dtype, gpu_data=mem) + increment_by_one[blockspergrid, threadsperblock](darr) + np.testing.assert_equal(np.frombuffer(cbuf.copy_to_host(), + dtype=arr.dtype), + arr + 1)