diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst index e4665d187e1..9a5a74bfaf1 100644 --- a/python/doc/source/plasma.rst +++ b/python/doc/source/plasma.rst @@ -98,9 +98,46 @@ follows: def random_object_id(): return plasma.ObjectID(np.random.bytes(20)) +Putting and Getting Python Objects +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Creating an Object -^^^^^^^^^^^^^^^^^^ +Plasma supports two APIs for creating and accessing objects: A high level +API that allows storing and retrieving Python objects and a low level +API that allows creating, writing and sealing buffers and operating on +the binary data directly. In this section we describe the high level API. + +This is how you can put and get a Python object: + +.. code-block:: python + + # Create a python object. + object_id = client.put("hello, world") + + # Get the object. + client.get(object_id) + +This works with all Python objects supported by the Arrow Python object +serialization. + +You can also get multiple objects at the same time (which can be more +efficient since it avoids IPC round trips): + +.. code-block:: python + + # Create multiple python objects. + object_id1 = client.put(1) + object_id2 = client.put(2) + object_id3 = client.put(3) + + # Get the objects. + client.get([object_id1, object_id2, object_id3]) + +Furthermore, it is possible to provide a timeout for the get call. If the +object is not available within the timeout, the special object +`pyarrow.ObjectNotAvailable` will be returned. + +Creating an Object Buffer +^^^^^^^^^^^^^^^^^^^^^^^^^ Objects are created in Plasma in two stages. First, they are **created**, which allocates a buffer for the object. At this point, the client can write to the @@ -111,7 +148,7 @@ give the object's maximum size in bytes. .. code-block:: python - # Create an object. + # Create an object buffer. object_id = plasma.ObjectID(20 * b"a") object_size = 1000 buffer = memoryview(client.create(object_id, object_size)) @@ -129,11 +166,11 @@ immutable, and making it available to other Plasma clients. client.seal(object_id) -Getting an Object -^^^^^^^^^^^^^^^^^ +Getting an Object Buffer +^^^^^^^^^^^^^^^^^^^^^^^^ After an object has been sealed, any client who knows the object ID can get -the object. +the object buffer. .. code-block:: python @@ -143,11 +180,11 @@ the object. # Get the object in the second client. This blocks until the object has been sealed. object_id2 = plasma.ObjectID(20 * b"a") - [buffer2] = client2.get([object_id]) + [buffer2] = client2.get_buffers([object_id]) -If the object has not been sealed yet, then the call to client.get will block -until the object has been sealed by the client constructing the object. Using -the ``timeout_ms`` argument to get, you can specify a timeout for this (in +If the object has not been sealed yet, then the call to client.get_buffers will +block until the object has been sealed by the client constructing the object. +Using the ``timeout_ms`` argument to get, you can specify a timeout for this (in milliseconds). After the timeout, the interpreter will yield control back. .. code-block:: shell @@ -223,7 +260,7 @@ To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID. .. code-block:: python # Get the arrow object by ObjectID. - [buf2] = client.get([object_id]) + [buf2] = client.get_buffers([object_id]) To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader`` @@ -310,13 +347,13 @@ Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object, to get the object back from the Plasma store, we follow similar steps to those specified in `Getting Arrow Objects from Plasma`_. -We first have to convert the ``PlasmaBuffer`` returned from ``client.get`` -into an Arrow ``BufferReader`` object. +We first have to convert the ``PlasmaBuffer`` returned from +``client.get_buffers`` into an Arrow ``BufferReader`` object. .. code-block:: python # Fetch the Plasma object - [data] = client.get([object_id]) # Get PlasmaBuffer from ObjectID + [data] = client.get_buffers([object_id]) # Get PlasmaBuffer from ObjectID buffer = pa.BufferReader(data) From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader`` diff --git a/python/examples/plasma/sorting/sort_df.py b/python/examples/plasma/sorting/sort_df.py index 03cfd13c6d7..0181ed729be 100644 --- a/python/examples/plasma/sorting/sort_df.py +++ b/python/examples/plasma/sorting/sort_df.py @@ -81,7 +81,7 @@ def put_df(df): def get_dfs(object_ids): """Retrieve dataframes from the object store given their object IDs.""" - buffers = client.get(object_ids) + buffers = client.get_buffers(object_ids) return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas() for buf in buffers] diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 68ae017ce71..985e262bced 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -85,12 +85,14 @@ ArrowIOError, ArrowMemoryError, ArrowNotImplementedError, - ArrowTypeError) + ArrowTypeError, + PlasmaObjectExists) # Serialization from pyarrow.lib import (deserialize_from, deserialize, serialize, serialize_to, read_serialized, - SerializedPyObject) + SerializedPyObject, + SerializationException, DeserializationException) from pyarrow.filesystem import FileSystem, LocalFileSystem diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index befa283d85b..aebef1b8812 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -26,6 +26,9 @@ from libcpp.vector cimport vector as c_vector from libc.stdint cimport int64_t, uint8_t, uintptr_t from cpython.pycapsule cimport * +import collections +import pyarrow + from pyarrow.lib cimport Buffer, NativeFile, check_status from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, CFixedSizeBufferWriter, CStatus) @@ -41,6 +44,9 @@ cdef extern from "plasma/common.h" nogil: @staticmethod CUniqueID from_binary(const c_string& binary) + @staticmethod + CUniqueID from_random() + c_bool operator==(const CUniqueID& rhs) const c_string hex() const @@ -157,6 +163,18 @@ cdef class ObjectID: """ return self.data.binary() + @staticmethod + def from_random(): + cdef CUniqueID data = CUniqueID.from_random() + return ObjectID(data.binary()) + + +cdef class ObjectNotAvailable: + """ + Placeholder for an object that was not available within the given timeout. + """ + pass + cdef class PlasmaBuffer(Buffer): """ @@ -285,7 +303,7 @@ cdef class PlasmaClient: metadata.size(), &data)) return self._make_mutable_plasma_buffer(object_id, data, data_size) - def get(self, object_ids, timeout_ms=-1): + def get_buffers(self, object_ids, timeout_ms=-1): """ Returns data buffer from the PlasmaStore based on object ID. @@ -296,7 +314,7 @@ cdef class PlasmaClient: ---------- object_ids : list A list of ObjectIDs used to identify some objects. - timeout_ms :int + timeout_ms : int The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 if the call should return immediately. @@ -352,6 +370,68 @@ cdef class PlasmaClient: object_buffers[i].metadata_size)) return result + def put(self, object value, ObjectID object_id=None): + """ + Store a Python value into the object store. + + Parameters + ---------- + value : object + A Python object to store. + object_id : ObjectID, default None + If this is provided, the specified object ID will be used to refer + to the object. + + Returns + ------- + The object ID associated to the Python object. + """ + cdef ObjectID target_id = object_id if object_id else ObjectID.from_random() + # TODO(pcm): Make serialization code support non-sequences and + # get rid of packing the value into a list here (and unpacking in get) + serialized = pyarrow.serialize([value]) + buffer = self.create(target_id, serialized.total_bytes) + stream = pyarrow.FixedSizeBufferOutputStream(buffer) + stream.set_memcopy_threads(4) + serialized.write_to(stream) + self.seal(target_id) + return target_id + + def get(self, object_ids, int timeout_ms=-1): + """ + Get one or more Python values from the object store. + + Parameters + ---------- + object_ids : list or ObjectID + Object ID or list of object IDs associated to the values we get from + the store. + timeout_ms : int, default -1 + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list or object + Python value or list of Python values for the data associated with + the object_ids and ObjectNotAvailable if the object was not available. + """ + if isinstance(object_ids, collections.Sequence): + results = [] + buffers = self.get_buffers(object_ids, timeout_ms) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within the + # timeout + if buffers[i]: + value, = pyarrow.deserialize(buffers[i]) + results.append(value) + else: + results.append(ObjectNotAvailable) + return results + else: + return self.get([object_ids], timeout_ms)[0] + def seal(self, ObjectID object_id): """ Seal the buffer in the PlasmaStore for a particular object ID. @@ -576,7 +656,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay, The maximum number of objects that the client will keep and delay releasing (for caching reasons). num_retries : int, default -1 - Number of times tor ty to connect to plasma store. Default value of -1 + Number of times to try to connect to plasma store. Default value of -1 uses the default (50) """ cdef PlasmaClient result = PlasmaClient() diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 04162bbbbad..d729c1ef2d2 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -82,8 +82,8 @@ def create_object(client, data_size, metadata_size, seal=True): def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None): import pyarrow.plasma as plasma - client1_buff = client1.get([object_id])[0] - client2_buff = client2.get([object_id])[0] + client1_buff = client1.get_buffers([object_id])[0] + client2_buff = client2.get_buffers([object_id])[0] client1_metadata = client1.get_metadata([object_id])[0] client2_metadata = client2.get_metadata([object_id])[0] assert len(client1_buff) == len(client2_buff) @@ -187,7 +187,7 @@ def test_create(self): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], + memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 @@ -209,7 +209,7 @@ def test_create_with_metadata(self): self.plasma_client.seal(object_id) # Get the object. memory_buffer = np.frombuffer( - self.plasma_client.get([object_id])[0], dtype="uint8") + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 # Get the metadata. @@ -241,7 +241,7 @@ def test_get(self): # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] - results = self.plasma_client.get(object_ids, timeout_ms=timeout) + results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] @@ -256,8 +256,8 @@ def test_get(self): # Test timing out from some but not all get calls with various # timeouts. for timeout in [0, 10, 100, 1000]: - data_results = self.plasma_client.get(object_ids, - timeout_ms=timeout) + data_results = self.plasma_client.get_buffers(object_ids, + timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) for i in range(num_object_ids): @@ -273,6 +273,19 @@ def test_get(self): else: assert results[i] is None + def test_put_and_get(self): + for value in [["hello", "world", 3, 1.0], None, "hello"]: + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) + assert result == value + + result = self.plasma_client.get(object_id) + assert result == value + + object_id = pa.plasma.ObjectID.from_random() + [result] = self.plasma_client.get([object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable + def test_store_arrow_objects(self): data = np.random.randn(10, 4) # Write an arrow object. @@ -284,7 +297,7 @@ def test_store_arrow_objects(self): pa.write_tensor(tensor, stream) self.plasma_client.seal(object_id) # Read the arrow object. - [tensor] = self.plasma_client.get([object_id]) + [tensor] = self.plasma_client.get_buffers([object_id]) reader = pa.BufferReader(tensor) array = pa.read_tensor(reader).to_numpy() # Assert that they are equal. @@ -313,7 +326,7 @@ def test_store_pandas_dataframe(self): self.plasma_client.seal(object_id) # Read the DataFrame. - [data] = self.plasma_client.get([object_id]) + [data] = self.plasma_client.get_buffers([object_id]) reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) result = reader.get_next_batch().to_pandas() @@ -551,7 +564,7 @@ def test_illegal_functionality(self): # with pytest.raises(Exception): # illegal_assignment() # Get the object. - memory_buffer = self.plasma_client.get([object_id])[0] + memory_buffer = self.plasma_client.get_buffers([object_id])[0] # Make sure the object is read only. def illegal_assignment(): diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 013d86ebf9c..12bf65be69c 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -88,6 +88,7 @@ def array_custom_serializer(obj): def array_custom_deserializer(serialized_obj): return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1])) + pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False, custom_serializer=array_custom_serializer, custom_deserializer=array_custom_deserializer)