Skip to content

Commit

Permalink
ARROW-1695: [Serialization] Fix reference counting of numpy arrays cr…
Browse files Browse the repository at this point in the history
…eated in custom serializer

This uses the NumPyBuffer built into Arrow's Tensor facility to protect the numpy arrays holding the Tensors to be serialized. See also the problem description in https://issues.apache.org/jira/browse/ARROW-1695.

Author: Philipp Moritz <pcmoritz@gmail.com>

Closes #1220 from pcmoritz/fix-serialize-tensors and squashes the following commits:

7e23bb5 [Philipp Moritz] fix linting
dce92ad [Philipp Moritz] fix handling of numpy arrays generated in the custom serializer methods
  • Loading branch information
pcmoritz committed Oct 20, 2017
1 parent a8f5185 commit 971e99d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
27 changes: 12 additions & 15 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,15 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value,

Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
int32_t recursion_depth, std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);
std::vector<std::shared_ptr<Tensor>>* tensors_out);

Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
std::vector<PyObject*>* subdicts,
std::vector<PyObject*>* tensors_out);
std::vector<std::shared_ptr<Tensor>>* tensors_out);

Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
int32_t recursion_depth, std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);
std::vector<std::shared_ptr<Tensor>>* tensors_out);

Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
if (PyArray_IsScalar(obj, Bool)) {
Expand Down Expand Up @@ -444,7 +444,7 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples,
std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets,
std::vector<PyObject*>* tensors_out) {
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
// The bool case must precede the int case (PyInt_Check passes for bools)
if (PyBool_Check(elem)) {
RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
Expand Down Expand Up @@ -525,7 +525,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,

Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
std::vector<PyObject*>* subdicts,
std::vector<PyObject*>* tensors_out) {
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
int dtype = PyArray_TYPE(array);
switch (dtype) {
case NPY_UINT8:
Expand All @@ -540,7 +540,10 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
case NPY_FLOAT:
case NPY_DOUBLE: {
RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(tensors_out->size())));
tensors_out->push_back(reinterpret_cast<PyObject*>(array));
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
reinterpret_cast<PyObject*>(array), &tensor));
tensors_out->push_back(tensor);
} break;
default: {
PyObject* serialized_object;
Expand All @@ -556,7 +559,7 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*

Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
int32_t recursion_depth, std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out) {
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
DCHECK(out);
if (recursion_depth >= kMaxRecursionDepth) {
return Status::NotImplemented(
Expand Down Expand Up @@ -603,7 +606,7 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,

Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
int32_t recursion_depth, std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out) {
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
DictBuilder result;
if (recursion_depth >= kMaxRecursionDepth) {
return Status::NotImplemented(
Expand Down Expand Up @@ -686,14 +689,8 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
PyDateTime_IMPORT;
std::vector<PyObject*> sequences = {sequence};
std::shared_ptr<Array> array;
std::vector<PyObject*> py_tensors;
RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, &py_tensors));
RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, &out->tensors));
out->batch = MakeBatch(array);
for (const auto& py_tensor : py_tensors) {
std::shared_ptr<Tensor> arrow_tensor;
RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), py_tensor, &arrow_tensor));
out->tensors.push_back(arrow_tensor);
}
return Status::OK();
}

Expand Down
21 changes: 21 additions & 0 deletions python/pyarrow/tests/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,27 @@ def test_numpy_immutable(large_memory_map):
result[0] = 1.0


# see https://issues.apache.org/jira/browse/ARROW-1695
def test_serialization_callback_numpy():

class DummyClass(object):
pass

def serialize_dummy_class(obj):
x = np.zeros(4)
return x

def deserialize_dummy_class(serialized_obj):
return serialized_obj

pa._default_serialization_context.register_type(
DummyClass, "DummyClass", pickle=False,
custom_serializer=serialize_dummy_class,
custom_deserializer=deserialize_dummy_class)

pa.serialize(DummyClass())


@pytest.mark.skip(reason="extensive memory requirements")
def test_arrow_limits(self):
def huge_memory_map(temp_dir):
Expand Down

0 comments on commit 971e99d

Please sign in to comment.