diff --git a/c_glib/configure.ac b/c_glib/configure.ac index eabe7bad51227..f4f2c99bbc39e 100644 --- a/c_glib/configure.ac +++ b/c_glib/configure.ac @@ -143,7 +143,7 @@ AC_CONFIG_FILES([ arrow-gpu-glib/arrow-gpu-glib.pc doc/Makefile doc/reference/Makefile - doc/reference/xml/Makefile + doc/reference/entities.xml example/Makefile example/lua/Makefile tool/Makefile diff --git a/c_glib/doc/reference/Makefile.am b/c_glib/doc/reference/Makefile.am index 4c005c237b300..454c2b0692da6 100644 --- a/c_glib/doc/reference/Makefile.am +++ b/c_glib/doc/reference/Makefile.am @@ -15,9 +15,6 @@ # specific language governing permissions and limitations # under the License. -SUBDIRS = \ - xml - DOC_MODULE = arrow-glib DOC_MAIN_SGML_FILE = $(DOC_MODULE)-docs.xml @@ -72,4 +69,5 @@ CLEANFILES += \ $(DOC_MODULE).types EXTRA_DIST += \ + entities.xml.in \ meson.build diff --git a/c_glib/doc/reference/arrow-glib-docs.xml b/c_glib/doc/reference/arrow-glib-docs.xml index 51e7b2a6a6cf5..23d1e9a0f271a 100644 --- a/c_glib/doc/reference/arrow-glib-docs.xml +++ b/c_glib/doc/reference/arrow-glib-docs.xml @@ -21,10 +21,10 @@ "http://www.oasis-open.org/docbook/xml/4.3/docbookx.dtd" [ - + %gtkdocentities; ]> - + &package_name; Reference Manual diff --git a/c_glib/doc/reference/xml/gtkdocentities.ent.in b/c_glib/doc/reference/entities.xml.in similarity index 76% rename from c_glib/doc/reference/xml/gtkdocentities.ent.in rename to c_glib/doc/reference/entities.xml.in index dc0cf1a0d8d4a..aa5addb4e8431 100644 --- a/c_glib/doc/reference/xml/gtkdocentities.ent.in +++ b/c_glib/doc/reference/entities.xml.in @@ -16,9 +16,9 @@ specific language governing permissions and limitations under the License. --> - - - - - - + + + + + + diff --git a/c_glib/doc/reference/meson.build b/c_glib/doc/reference/meson.build index 3374fbde5b9ed..431aa0a5c82a1 100644 --- a/c_glib/doc/reference/meson.build +++ b/c_glib/doc/reference/meson.build @@ -17,7 +17,18 @@ # specific language governing permissions and limitations # under the License. -subdir('xml') +entities_conf = configuration_data() +entities_conf.set('PACKAGE', meson.project_name()) +entities_conf.set('PACKAGE_BUGREPORT', + 'https://issues.apache.org/jira/browse/ARROW') +entities_conf.set('PACKAGE_NAME', meson.project_name()) +entities_conf.set('PACKAGE_STRING', + ' '.join([meson.project_name(), version])) +entities_conf.set('PACKAGE_URL', 'https://arrow.apache.org/') +entities_conf.set('PACKAGE_VERSION', version) +configure_file(input: 'entities.xml.in', + output: 'entities.xml', + configuration: entities_conf) private_headers = [ ] diff --git a/c_glib/doc/reference/xml/Makefile.am b/c_glib/doc/reference/xml/Makefile.am deleted file mode 100644 index 833cfddc69078..0000000000000 --- a/c_glib/doc/reference/xml/Makefile.am +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -EXTRA_DIST = \ - gtkdocentities.ent.in \ - meson.build diff --git a/c_glib/doc/reference/xml/meson.build b/c_glib/doc/reference/xml/meson.build deleted file mode 100644 index 5b65042764fee..0000000000000 --- a/c_glib/doc/reference/xml/meson.build +++ /dev/null @@ -1,31 +0,0 @@ -# -*- indent-tabs-mode: nil -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -entities_conf = configuration_data() -entities_conf.set('package', meson.project_name()) -entities_conf.set('package_bugreport', - 'https://issues.apache.org/jira/browse/ARROW') -entities_conf.set('package_name', meson.project_name()) -entities_conf.set('package_string', - ' '.join([meson.project_name(), version])) -entities_conf.set('package_url', 'https://arrow.apache.org/') -entities_conf.set('package_version', version) -configure_file(input: 'gtkdocentities.ent.in', - output: 'gtkdocentities.ent', - configuration: entities_conf) diff --git a/ci/travis_lint.sh b/ci/travis_lint.sh index e234b7b015b8d..6a2a0be18cf9f 100755 --- a/ci/travis_lint.sh +++ b/ci/travis_lint.sh @@ -35,10 +35,10 @@ popd # Fail fast on style checks sudo pip install flake8 -PYARROW_DIR=$TRAVIS_BUILD_DIR/python/pyarrow +PYTHON_DIR=$TRAVIS_BUILD_DIR/python -flake8 --count $PYARROW_DIR +flake8 --count $PYTHON_DIR/pyarrow # Check Cython files with some checks turned off flake8 --count --config=$PYTHON_DIR/.flake8.cython \ - $PYARROW_DIR + $PYTHON_DIR/pyarrow diff --git a/cpp/src/arrow/python/numpy_to_arrow.cc b/cpp/src/arrow/python/numpy_to_arrow.cc index f21b40ed3c246..c5c02e355ded6 100644 --- a/cpp/src/arrow/python/numpy_to_arrow.cc +++ b/cpp/src/arrow/python/numpy_to_arrow.cc @@ -175,7 +175,7 @@ static Status AppendObjectBinaries(PyArrayObject* arr, PyArrayObject* mask, continue; } else if (!PyBytes_Check(obj)) { std::stringstream ss; - ss << "Error converting to Python objects to bytes: "; + ss << "Error converting from Python objects to bytes: "; RETURN_NOT_OK(InvalidConversion(obj, "str, bytes", &ss)); return Status::Invalid(ss.str()); } @@ -230,7 +230,7 @@ static Status AppendObjectStrings(PyArrayObject* arr, PyArrayObject* mask, int64 *have_bytes = true; } else { std::stringstream ss; - ss << "Error converting to Python objects to String/UTF8: "; + ss << "Error converting from Python objects to String/UTF8: "; RETURN_NOT_OK(InvalidConversion(obj, "str, bytes", &ss)); return Status::Invalid(ss.str()); } @@ -278,7 +278,7 @@ static Status AppendObjectFixedWidthBytes(PyArrayObject* arr, PyArrayObject* mas tmp_obj.reset(obj); } else if (!PyBytes_Check(obj)) { std::stringstream ss; - ss << "Error converting to Python objects to FixedSizeBinary: "; + ss << "Error converting from Python objects to FixedSizeBinary: "; RETURN_NOT_OK(InvalidConversion(obj, "str, bytes", &ss)); return Status::Invalid(ss.str()); } diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 3f1c6be3a87f6..99e4dd5db5146 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -108,6 +108,21 @@ TEST_F(TestChunkedArray, EqualsDifferingLengths) { ASSERT_TRUE(one_->Equals(*another_.get())); } +TEST_F(TestChunkedArray, SliceEquals) { + arrays_one_.push_back(MakeRandomArray(100)); + arrays_one_.push_back(MakeRandomArray(50)); + arrays_one_.push_back(MakeRandomArray(50)); + Construct(); + + std::shared_ptr slice = one_->Slice(125, 50); + ASSERT_EQ(slice->length(), 50); + ASSERT_TRUE(slice->Equals(one_->Slice(125, 50))); + + std::shared_ptr slice2 = one_->Slice(75)->Slice(25)->Slice(25, 50); + ASSERT_EQ(slice2->length(), 50); + ASSERT_TRUE(slice2->Equals(slice)); +} + class TestColumn : public TestChunkedArray { protected: void Construct() override { @@ -158,6 +173,22 @@ TEST_F(TestColumn, ChunksInhomogeneous) { ASSERT_RAISES(Invalid, column_->ValidateData()); } +TEST_F(TestColumn, SliceEquals) { + arrays_one_.push_back(MakeRandomArray(100)); + arrays_one_.push_back(MakeRandomArray(50)); + arrays_one_.push_back(MakeRandomArray(50)); + one_field_ = field("column", int32()); + Construct(); + + std::shared_ptr slice = one_col_->Slice(125, 50); + ASSERT_EQ(slice->length(), 50); + ASSERT_TRUE(slice->Equals(one_col_->Slice(125, 50))); + + std::shared_ptr slice2 = one_col_->Slice(75)->Slice(25)->Slice(25, 50); + ASSERT_EQ(slice2->length(), 50); + ASSERT_TRUE(slice2->Equals(slice)); +} + TEST_F(TestColumn, Equals) { std::vector null_bitmap(100, true); std::vector data(100, 1); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 2cf6c26523965..14877ccb537c2 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -102,6 +102,30 @@ bool ChunkedArray::Equals(const std::shared_ptr& other) const { return Equals(*other.get()); } +std::shared_ptr ChunkedArray::Slice(int64_t offset, int64_t length) const { + DCHECK_LE(offset, length_); + + int curr_chunk = 0; + while (offset >= chunk(curr_chunk)->length()) { + offset -= chunk(curr_chunk)->length(); + curr_chunk++; + } + + ArrayVector new_chunks; + while (length > 0 && curr_chunk < num_chunks()) { + new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length)); + length -= chunk(curr_chunk)->length() - offset; + offset = 0; + curr_chunk++; + } + + return std::make_shared(new_chunks); +} + +std::shared_ptr ChunkedArray::Slice(int64_t offset) const { + return Slice(offset, length_); +} + Column::Column(const std::shared_ptr& field, const ArrayVector& chunks) : field_(field) { data_ = std::make_shared(chunks); diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index c813b32ad36dc..570a650e7fa4a 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -44,6 +44,7 @@ class ARROW_EXPORT ChunkedArray { /// \return the total length of the chunked array; computed on construction int64_t length() const { return length_; } + /// \return the total number of nulls among all chunks int64_t null_count() const { return null_count_; } int num_chunks() const { return static_cast(chunks_.size()); } @@ -53,6 +54,20 @@ class ARROW_EXPORT ChunkedArray { const ArrayVector& chunks() const { return chunks_; } + /// \brief Construct a zero-copy slice of the chunked array with the + /// indicated offset and length + /// + /// \param[in] offset the position of the first element in the constructed + /// slice + /// \param[in] length the length of the slice. If there are not enough + /// elements in the chunked array, the length will be adjusted accordingly + /// + /// \return a new object wrapped in std::shared_ptr + std::shared_ptr Slice(int64_t offset, int64_t length) const; + + /// \brief Slice from offset until end of the chunked array + std::shared_ptr Slice(int64_t offset) const; + std::shared_ptr type() const; bool Equals(const ChunkedArray& other) const; @@ -67,8 +82,9 @@ class ARROW_EXPORT ChunkedArray { ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray); }; +/// \class Column /// \brief An immutable column data structure consisting of a field (type -/// metadata) and a logical chunked data array +/// metadata) and a chunked data array class ARROW_EXPORT Column { public: Column(const std::shared_ptr& field, const ArrayVector& chunks); @@ -97,6 +113,24 @@ class ARROW_EXPORT Column { /// \return the column's data as a chunked logical array std::shared_ptr data() const { return data_; } + /// \brief Construct a zero-copy slice of the column with the indicated + /// offset and length + /// + /// \param[in] offset the position of the first element in the constructed + /// slice + /// \param[in] length the length of the slice. If there are not enough + /// elements in the column, the length will be adjusted accordingly + /// + /// \return a new object wrapped in std::shared_ptr + std::shared_ptr Slice(int64_t offset, int64_t length) const { + return std::make_shared(field_, data_->Slice(offset, length)); + } + + /// \brief Slice from offset until end of the column + std::shared_ptr Slice(int64_t offset) const { + return std::make_shared(field_, data_->Slice(offset)); + } + bool Equals(const Column& other) const; bool Equals(const std::shared_ptr& other) const; diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index d74c0f412d97f..a683da0022b18 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -54,8 +54,6 @@ namespace plasma { -using arrow::MutableBuffer; - // Number of threads used for memcopy and hash computations. constexpr int64_t kThreadPoolSize = 8; constexpr int64_t kBytesInMB = 1 << 20; @@ -130,7 +128,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec // Increment the count of the number of objects in the memory-mapped file // that are being used. The corresponding decrement should happen in // PlasmaClient::Release. - auto entry = mmap_table_.find(object->handle.store_fd); + auto entry = mmap_table_.find(object->store_fd); ARROW_CHECK(entry != mmap_table_.end()); ARROW_CHECK(entry->second.count >= 0); // Update the in_use_object_bytes_. @@ -149,7 +147,7 @@ void PlasmaClient::increment_object_count(const ObjectID& object_id, PlasmaObjec Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata, int64_t metadata_size, - std::shared_ptr* data) { + std::shared_ptr* data) { ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size)); @@ -157,7 +155,10 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer)); ObjectID id; PlasmaObject object; - RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object)); + int store_fd; + int64_t mmap_size; + RETURN_NOT_OK( + ReadCreateReply(buffer.data(), buffer.size(), &id, &object, &store_fd, &mmap_size)); // If the CreateReply included an error, then the store will not send a file // descriptor. int fd = recv_fd(store_conn_); @@ -167,9 +168,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, // The metadata should come right after the data. ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); *data = std::make_shared( - lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) + - object.data_offset, - data_size); + lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size); // If plasma_create is being called from a transfer, then we will not copy the // metadata here. The metadata will be written along with the data streamed // from the transfer. @@ -209,7 +208,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, ARROW_CHECK(object_entry->second->is_sealed) << "Plasma client called get on an unsealed object that it created"; PlasmaObject* object = &object_entry->second->object; - uint8_t* data = lookup_mmapped_file(object->handle.store_fd); + uint8_t* data = lookup_mmapped_file(object->store_fd); object_buffers[i].data = std::make_shared(data + object->data_offset, object->data_size); object_buffers[i].metadata = std::make_shared( @@ -236,8 +235,19 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, std::vector received_object_ids(num_objects); std::vector object_data(num_objects); PlasmaObject* object; + std::vector store_fds; + std::vector mmap_sizes; RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(), - object_data.data(), num_objects)); + object_data.data(), num_objects, store_fds, mmap_sizes)); + + // We mmap all of the file descriptors here so that we can avoid look them up + // in the subsequent loop based on just the store file descriptor and without + // having to know the relevant file descriptor received from recv_fd. + for (size_t i = 0; i < store_fds.size(); i++) { + int fd = recv_fd(store_conn_); + ARROW_CHECK(fd >= 0); + lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]); + } for (int i = 0; i < num_objects; ++i) { DCHECK(received_object_ids[i] == object_ids[i]); @@ -246,12 +256,6 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, // If the object was already in use by the client, then the store should // have returned it. DCHECK_NE(object->data_size, -1); - // We won't use this file descriptor, but the store sent us one, so we - // need to receive it and then close it right away so we don't leak file - // descriptors. - int fd = recv_fd(store_conn_); - close(fd); - ARROW_CHECK(fd >= 0); // We've already filled out the information for this object, so we can // just continue. continue; @@ -259,12 +263,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects, // If we are here, the object was not currently in use, so we need to // process the reply from the object store. if (object->data_size != -1) { - // The object was retrieved. The user will be responsible for releasing - // this object. - int fd = recv_fd(store_conn_); - uint8_t* data = - lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size); - ARROW_CHECK(fd >= 0); + uint8_t* data = lookup_mmapped_file(object->store_fd); // Finish filling out the return values. object_buffers[i].data = std::make_shared(data + object->data_offset, object->data_size); @@ -296,7 +295,7 @@ Status PlasmaClient::UnmapObject(const ObjectID& object_id) { // Decrement the count of the number of objects in this memory-mapped file // that the client is using. The corresponding increment should have // happened in plasma_get. - int fd = object_entry->second->object.handle.store_fd; + int fd = object_entry->second->object.store_fd; auto entry = mmap_table_.find(fd); ARROW_CHECK(entry != mmap_table_.end()); ARROW_CHECK(entry->second.count >= 1); diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index 35182f8403201..a1e10a9c29969 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -31,8 +31,9 @@ #include "arrow/util/visibility.h" #include "plasma/common.h" -using arrow::Status; using arrow::Buffer; +using arrow::MutableBuffer; +using arrow::Status; namespace plasma { @@ -115,7 +116,7 @@ class ARROW_EXPORT PlasmaClient { /// will be written here. /// \return The return status. Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata, - int64_t metadata_size, std::shared_ptr* data); + int64_t metadata_size, std::shared_ptr* data); /// Get some objects from the Plasma Store. This function will block until the /// objects have all been created and sealed in the Plasma Store or the /// timeout diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs index ea6dc8bb98da5..33803f7799ba0 100644 --- a/cpp/src/plasma/format/plasma.fbs +++ b/cpp/src/plasma/format/plasma.fbs @@ -89,8 +89,6 @@ struct PlasmaObjectSpec { // Index of the memory segment (= memory mapped file) that // this object is allocated in. segment_index: int; - // Size in bytes of this segment (needed to call mmap). - mmap_size: ulong; // The offset in bytes in the memory mapped file of the data. data_offset: ulong; // The size in bytes of the data. @@ -117,6 +115,12 @@ table PlasmaCreateReply { plasma_object: PlasmaObjectSpec; // Error that occurred for this call. error: PlasmaError; + // The file descriptor in the store that corresponds to the file descriptor + // being sent to the client right after this message. + store_fd: int; + // The size in bytes of the segment for the store file descriptor (needed to + // call mmap). + mmap_size: long; } table PlasmaAbortRequest { @@ -156,9 +160,17 @@ table PlasmaGetReply { // objects if not all requested objects are stored and sealed // in the local Plasma store. object_ids: [string]; - // Plasma object information, in the same order as their IDs. + // Plasma object information, in the same order as their IDs. The number of + // elements in both object_ids and plasma_objects arrays must agree. plasma_objects: [PlasmaObjectSpec]; - // The number of elements in both object_ids and plasma_objects arrays must agree. + // A list of the file descriptors in the store that correspond to the file + // descriptors being sent to the client. The length of this list is the number + // of file descriptors that the store will send to the client after this + // message. + store_fds: [int]; + // Size in bytes of the segment for each store file descriptor (needed to call + // mmap). This list must have the same length as store_fds. + mmap_sizes: [long]; } table PlasmaReleaseRequest { diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc index 52d362013f1ae..3c5d107b2bbe3 100644 --- a/cpp/src/plasma/malloc.cc +++ b/cpp/src/plasma/malloc.cc @@ -197,4 +197,14 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse *offset = 0; } +int64_t get_mmap_size(int fd) { + for (const auto& entry : mmap_records) { + if (entry.second.fd == fd) { + return entry.second.size; + } + } + ARROW_LOG(FATAL) << "failed to find entry in mmap_records for fd " << fd; + return -1; // This code is never reached. +} + void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); } diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h index 0df720db59817..cb8c600b14b3b 100644 --- a/cpp/src/plasma/malloc.h +++ b/cpp/src/plasma/malloc.h @@ -23,6 +23,12 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset); +/// Get the mmap size corresponding to a specific file descriptor. +/// +/// @param fd The file descriptor to look up. +/// @return The size of the corresponding memory-mapped file. +int64_t get_mmap_size(int fd); + void set_malloc_granularity(int value); #endif // MALLOC_H diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 603ff8a4fac6c..2d07c919a18f4 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -64,20 +64,12 @@ struct Client; /// Mapping from object IDs to type and status of the request. typedef std::unordered_map ObjectRequestMap; -/// Handle to access memory mapped file and map it into client address space. -struct object_handle { +// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. +struct PlasmaObject { /// The file descriptor of the memory mapped file in the store. It is used as /// a unique identifier of the file in the client to look up the corresponding /// file descriptor on the client's side. int store_fd; - /// The size in bytes of the memory mapped file. - int64_t mmap_size; -}; - -// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. -struct PlasmaObject { - /// Handle for memory mapped file the object is stored in. - object_handle handle; /// The offset in bytes in the memory mapped file of the data. ptrdiff_t data_offset; /// The offset in bytes in the memory mapped file of the metadata. diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index c0ebb88fe5019..6c0bc0cab28bb 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -73,30 +73,32 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, return Status::OK(); } -Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, - int error_code) { +Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error_code, + int64_t mmap_size) { flatbuffers::FlatBufferBuilder fbb; - PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size, - object->data_offset, object->data_size, + PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size, object->metadata_offset, object->metadata_size); - auto message = - CreatePlasmaCreateReply(fbb, fbb.CreateString(object_id.binary()), &plasma_object, - static_cast(error_code)); + auto message = CreatePlasmaCreateReply( + fbb, fbb.CreateString(object_id.binary()), &plasma_object, + static_cast(error_code), object->store_fd, mmap_size); return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message); } Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, - PlasmaObject* object) { + PlasmaObject* object, int* store_fd, int64_t* mmap_size) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); *object_id = ObjectID::from_binary(message->object_id()->str()); - object->handle.store_fd = message->plasma_object()->segment_index(); - object->handle.mmap_size = message->plasma_object()->mmap_size(); + object->store_fd = message->plasma_object()->segment_index(); object->data_offset = message->plasma_object()->data_offset(); object->data_size = message->plasma_object()->data_size(); object->metadata_offset = message->plasma_object()->metadata_offset(); object->metadata_size = message->plasma_object()->metadata_size(); + + *store_fd = message->store_fd(); + *mmap_size = message->mmap_size(); + return plasma_error_status(message->error()); } @@ -389,24 +391,29 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ Status SendGetReply( int sock, ObjectID object_ids[], std::unordered_map& plasma_objects, - int64_t num_objects) { + int64_t num_objects, const std::vector& store_fds, + const std::vector& mmap_sizes) { flatbuffers::FlatBufferBuilder fbb; std::vector objects; - for (int i = 0; i < num_objects; ++i) { + ARROW_CHECK(store_fds.size() == mmap_sizes.size()); + + for (int64_t i = 0; i < num_objects; ++i) { const PlasmaObject& object = plasma_objects[object_ids[i]]; - objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size, - object.data_offset, object.data_size, - object.metadata_offset, object.metadata_size)); + objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset, + object.data_size, object.metadata_offset, + object.metadata_size)); } auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects), - fbb.CreateVectorOfStructs(objects.data(), num_objects)); + fbb.CreateVectorOfStructs(objects.data(), num_objects), + fbb.CreateVector(store_fds), fbb.CreateVector(mmap_sizes)); return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message); } Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], - PlasmaObject plasma_objects[], int64_t num_objects) { + PlasmaObject plasma_objects[], int64_t num_objects, + std::vector& store_fds, std::vector& mmap_sizes) { DCHECK(data); auto message = flatbuffers::GetRoot(data); DCHECK(verify_flatbuffer(message, data, size)); @@ -415,13 +422,17 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], } for (uoffset_t i = 0; i < num_objects; ++i) { const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); - plasma_objects[i].handle.store_fd = object->segment_index(); - plasma_objects[i].handle.mmap_size = object->mmap_size(); + plasma_objects[i].store_fd = object->segment_index(); plasma_objects[i].data_offset = object->data_offset(); plasma_objects[i].data_size = object->data_size(); plasma_objects[i].metadata_offset = object->metadata_offset(); plasma_objects[i].metadata_size = object->metadata_size(); } + ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); + for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { + store_fds.push_back(message->store_fds()->Get(i)); + mmap_sizes.push_back(message->mmap_sizes()->Get(i)); + } return Status::OK(); } diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index e8c334f9181fc..44263a6418439 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -46,10 +46,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size); -Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error); +Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error, + int64_t mmap_size); Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id, - PlasmaObject* object); + PlasmaObject* object, int* store_fd, int64_t* mmap_size); Status SendAbortRequest(int sock, ObjectID object_id); @@ -81,10 +82,12 @@ Status ReadGetRequest(uint8_t* data, size_t size, std::vector& object_ Status SendGetReply( int sock, ObjectID object_ids[], std::unordered_map& plasma_objects, - int64_t num_objects); + int64_t num_objects, const std::vector& store_fds, + const std::vector& mmap_sizes); Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[], - PlasmaObject plasma_objects[], int64_t num_objects); + PlasmaObject plasma_objects[], int64_t num_objects, + std::vector& store_fds, std::vector& mmap_sizes); /* Plasma Release message functions. */ diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index dde7f9cdfa8eb..80dd525e3e3b4 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -192,8 +192,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size, entry->state = PLASMA_CREATED; store_info_.objects[object_id] = std::move(entry); - result->handle.store_fd = fd; - result->handle.mmap_size = map_size; + result->store_fd = fd; result->data_offset = offset; result->metadata_offset = offset + data_size; result->data_size = data_size; @@ -211,8 +210,7 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) { DCHECK(object != NULL); DCHECK(entry != NULL); DCHECK(entry->state == PLASMA_SEALED); - object->handle.store_fd = entry->fd; - object->handle.mmap_size = entry->map_size; + object->store_fd = entry->fd; object->data_offset = entry->offset; object->metadata_offset = entry->offset + entry->info.data_size; object->data_size = entry->info.data_size; @@ -220,34 +218,44 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) { } void PlasmaStore::return_from_get(GetRequest* get_req) { + // Figure out how many file descriptors we need to send. + std::unordered_set fds_to_send; + std::vector store_fds; + std::vector mmap_sizes; + for (const auto& object_id : get_req->object_ids) { + PlasmaObject& object = get_req->objects[object_id]; + int fd = object.store_fd; + if (object.data_size != -1 && fds_to_send.count(fd) == 0) { + fds_to_send.insert(fd); + store_fds.push_back(fd); + mmap_sizes.push_back(get_mmap_size(fd)); + } + } + // Send the get reply to the client. Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects, - get_req->object_ids.size()); + get_req->object_ids.size(), store_fds, mmap_sizes); warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd); // If we successfully sent the get reply message to the client, then also send // the file descriptors. if (s.ok()) { // Send all of the file descriptors for the present objects. - for (const auto& object_id : get_req->object_ids) { - PlasmaObject& object = get_req->objects[object_id]; - // We use the data size to indicate whether the object is present or not. - if (object.data_size != -1) { - int error_code = send_fd(get_req->client->fd, object.handle.store_fd); - // If we failed to send the file descriptor, loop until we have sent it - // successfully. TODO(rkn): This is problematic for two reasons. First - // of all, sending the file descriptor should just succeed without any - // errors, but sometimes I see a "Message too long" error number. - // Second, looping like this allows a client to potentially block the - // plasma store event loop which should never happen. - while (error_code < 0) { - if (errno == EMSGSIZE) { - ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying."; - error_code = send_fd(get_req->client->fd, object.handle.store_fd); - continue; - } - warn_if_sigpipe(error_code, get_req->client->fd); - break; + for (int store_fd : store_fds) { + int error_code = send_fd(get_req->client->fd, store_fd); + // If we failed to send the file descriptor, loop until we have sent it + // successfully. TODO(rkn): This is problematic for two reasons. First + // of all, sending the file descriptor should just succeed without any + // errors, but sometimes I see a "Message too long" error number. + // Second, looping like this allows a client to potentially block the + // plasma store event loop which should never happen. + while (error_code < 0) { + if (errno == EMSGSIZE) { + ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying."; + error_code = send_fd(get_req->client->fd, store_fd); + continue; } + warn_if_sigpipe(error_code, get_req->client->fd); + break; } } } @@ -640,10 +648,15 @@ Status PlasmaStore::process_message(Client* client) { ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size)); int error_code = create_object(object_id, data_size, metadata_size, client, &object); - HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code), - client->fd); + int64_t mmap_size = 0; + if (error_code == PlasmaError_OK) { + mmap_size = get_mmap_size(object.store_fd); + } + HANDLE_SIGPIPE( + SendCreateReply(client->fd, object_id, &object, error_code, mmap_size), + client->fd); if (error_code == PlasmaError_OK) { - warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd); + warn_if_sigpipe(send_fd(client->fd, object.store_fd), client->fd); } } break; case MessageType_PlasmaAbortRequest: { diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index f19c2bfbdb380..63b56934f3599 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -70,7 +70,7 @@ TEST_F(TestPlasmaStore, DeleteTest) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id)); @@ -96,7 +96,7 @@ TEST_F(TestPlasmaStore, ContainsTest) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client_.Seal(object_id)); // Avoid race condition of Plasma Manager waiting for notification. @@ -119,7 +119,7 @@ TEST_F(TestPlasmaStore, GetTest) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data_buffer; + std::shared_ptr data_buffer; uint8_t* data; ARROW_CHECK_OK( client_.Create(object_id, data_size, metadata, metadata_size, &data_buffer)); @@ -145,7 +145,7 @@ TEST_F(TestPlasmaStore, MultipleGetTest) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data)); data->mutable_data()[0] = 1; ARROW_CHECK_OK(client_.Seal(object_id1)); @@ -172,7 +172,7 @@ TEST_F(TestPlasmaStore, AbortTest) { int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; uint8_t* data_ptr; ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); data_ptr = data->mutable_data(); @@ -220,7 +220,7 @@ TEST_F(TestPlasmaStore, MultipleClientTest) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; ARROW_CHECK_OK(client2_.Create(object_id, data_size, metadata, metadata_size, &data)); ARROW_CHECK_OK(client2_.Seal(object_id)); // Test that the first client can get the object. @@ -260,7 +260,7 @@ TEST_F(TestPlasmaStore, ManyObjectTest) { int64_t data_size = 100; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); - std::shared_ptr data; + std::shared_ptr data; ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data)); if (i % 3 == 0) { diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index b593b6ae94890..656b2cc6b9bca 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -63,8 +63,7 @@ PlasmaObject random_plasma_object(void) { int random = rand_r(&seed); PlasmaObject object; memset(&object, 0, sizeof(object)); - object.handle.store_fd = random + 7; - object.handle.mmap_size = random + 42; + object.store_fd = random + 7; object.data_offset = random + 1; object.metadata_offset = random + 2; object.data_size = random + 3; @@ -94,13 +93,19 @@ TEST(PlasmaSerialization, CreateReply) { int fd = create_temp_file(); ObjectID object_id1 = ObjectID::from_random(); PlasmaObject object1 = random_plasma_object(); - ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0)); + int64_t mmap_size1 = 1000000; + ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0, mmap_size1)); std::vector data = read_message_from_file(fd, MessageType_PlasmaCreateReply); ObjectID object_id2; PlasmaObject object2; memset(&object2, 0, sizeof(object2)); - ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2)); + int store_fd; + int64_t mmap_size2; + ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2, + &store_fd, &mmap_size2)); ASSERT_EQ(object_id1, object_id2); + ASSERT_EQ(object1.store_fd, store_fd); + ASSERT_EQ(mmap_size1, mmap_size2); ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0); close(fd); } @@ -158,13 +163,20 @@ TEST(PlasmaSerialization, GetReply) { std::unordered_map plasma_objects; plasma_objects[object_ids[0]] = random_plasma_object(); plasma_objects[object_ids[1]] = random_plasma_object(); - ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2)); + std::vector store_fds = {1, 2, 3}; + std::vector mmap_sizes = {100, 200, 300}; + ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2, store_fds, mmap_sizes)); + std::vector data = read_message_from_file(fd, MessageType_PlasmaGetReply); ObjectID object_ids_return[2]; PlasmaObject plasma_objects_return[2]; + std::vector store_fds_return; + std::vector mmap_sizes_return; memset(&plasma_objects_return, 0, sizeof(plasma_objects_return)); ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return, - &plasma_objects_return[0], 2)); + &plasma_objects_return[0], 2, store_fds_return, + mmap_sizes_return)); + ASSERT_EQ(object_ids[0], object_ids_return[0]); ASSERT_EQ(object_ids[1], object_ids_return[1]); ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0], @@ -173,6 +185,8 @@ TEST(PlasmaSerialization, GetReply) { ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1], sizeof(PlasmaObject)), 0); + ASSERT_TRUE(store_fds == store_fds_return); + ASSERT_TRUE(mmap_sizes == mmap_sizes_return); close(fd); } diff --git a/dev/gen_apidocs/create_documents.sh b/dev/gen_apidocs/create_documents.sh index 54031262b3a5d..3100d3b984b3a 100755 --- a/dev/gen_apidocs/create_documents.sh +++ b/dev/gen_apidocs/create_documents.sh @@ -87,8 +87,6 @@ if [ -f Makefile ]; then # Ensure updating to prevent auto re-configure touch configure **/Makefile make distclean - # Work around for 'make distclean' removes doc/reference/xml/ - git checkout doc/reference/xml fi ./autogen.sh rm -rf build_docs diff --git a/python/pyarrow/_orc.pxd b/python/pyarrow/_orc.pxd index 411691510423c..c07a19442b577 100644 --- a/python/pyarrow/_orc.pxd +++ b/python/pyarrow/_orc.pxd @@ -29,9 +29,10 @@ from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus, TimeUnit) -cdef extern from "arrow/adapters/orc/adapter.h" namespace "arrow::adapters::orc" nogil: - cdef cppclass ORCFileReader: +cdef extern from "arrow/adapters/orc/adapter.h" \ + namespace "arrow::adapters::orc" nogil: + cdef cppclass ORCFileReader: @staticmethod CStatus Open(const shared_ptr[RandomAccessFile]& file, CMemoryPool* pool, @@ -40,7 +41,8 @@ cdef extern from "arrow/adapters/orc/adapter.h" namespace "arrow::adapters::orc" CStatus ReadSchema(shared_ptr[CSchema]* out) CStatus ReadStripe(int64_t stripe, shared_ptr[CRecordBatch]* out) - CStatus ReadStripe(int64_t stripe, std_vector[int], shared_ptr[CRecordBatch]* out) + CStatus ReadStripe(int64_t stripe, std_vector[int], + shared_ptr[CRecordBatch]* out) CStatus Read(shared_ptr[CTable]* out) CStatus Read(std_vector[int], shared_ptr[CTable]* out) diff --git a/python/pyarrow/_orc.pyx b/python/pyarrow/_orc.pyx index 7ff4bac6dc95f..cf04f48a32319 100644 --- a/python/pyarrow/_orc.pyx +++ b/python/pyarrow/_orc.pyx @@ -50,7 +50,7 @@ cdef class ORCReader: get_reader(source, &rd_handle) with nogil: check_status(ORCFileReader.Open(rd_handle, self.allocator, - &self.reader)) + &self.reader)) def schema(self): """ @@ -69,10 +69,10 @@ cdef class ORCReader: return pyarrow_wrap_schema(sp_arrow_schema) def nrows(self): - return deref(self.reader).NumberOfRows(); + return deref(self.reader).NumberOfRows() def nstripes(self): - return deref(self.reader).NumberOfStripes(); + return deref(self.reader).NumberOfStripes() def read_stripe(self, n, include_indices=None): cdef: @@ -85,11 +85,13 @@ cdef class ORCReader: if include_indices is None: with nogil: - check_status(deref(self.reader).ReadStripe(stripe, &sp_record_batch)) + (check_status(deref(self.reader) + .ReadStripe(stripe, &sp_record_batch))) else: indices = include_indices with nogil: - check_status(deref(self.reader).ReadStripe(stripe, indices, &sp_record_batch)) + (check_status(deref(self.reader) + .ReadStripe(stripe, indices, &sp_record_batch))) batch = RecordBatch() batch.init(sp_record_batch) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 29e233b6e4e67..801d094194b71 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -81,7 +81,7 @@ cdef extern from "plasma/client.h" nogil: CStatus Create(const CUniqueID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, - const shared_ptr[CBuffer]* data) + const shared_ptr[CMutableBuffer]* data) CStatus Get(const CUniqueID* object_ids, int64_t num_objects, int64_t timeout_ms, CObjectBuffer* object_buffers) @@ -248,8 +248,8 @@ cdef class PlasmaClient: check_status(self.client.get().Get(ids.data(), ids.size(), timeout_ms, result[0].data())) - cdef _make_plasma_buffer(self, ObjectID object_id, shared_ptr[CBuffer] buffer, - int64_t size): + cdef _make_plasma_buffer(self, ObjectID object_id, + shared_ptr[CBuffer] buffer, int64_t size): result = PlasmaBuffer(object_id, self) result.init(buffer) return result @@ -297,12 +297,14 @@ cdef class PlasmaClient: not be created because the plasma store is unable to evict enough objects to create room for it. """ - cdef shared_ptr[CBuffer] data + cdef shared_ptr[CMutableBuffer] data with nogil: check_status(self.client.get().Create(object_id.data, data_size, (metadata.data()), metadata.size(), &data)) - return self._make_mutable_plasma_buffer(object_id, data.get().mutable_data(), data_size) + return self._make_mutable_plasma_buffer(object_id, + data.get().mutable_data(), + data_size) def get_buffers(self, object_ids, timeout_ms=-1): """ diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index d95d582fe537e..e7a39905f1f65 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -50,6 +50,8 @@ cdef class SerializationContext: object types_to_pickle object custom_serializers object custom_deserializers + object pickle_serializer + object pickle_deserializer def __init__(self): # Types with special serialization handlers @@ -58,6 +60,23 @@ cdef class SerializationContext: self.types_to_pickle = set() self.custom_serializers = dict() self.custom_deserializers = dict() + self.pickle_serializer = pickle.dumps + self.pickle_deserializer = pickle.loads + + def set_pickle(self, serializer, deserializer): + """ + Set the serializer and deserializer to use for objects that are to be + pickled. + + Parameters + ---------- + serializer : callable + The serializer to use (e.g., pickle.dumps or cloudpickle.dumps). + deserializer : callable + The deserializer to use (e.g., pickle.dumps or cloudpickle.dumps). + """ + self.pickle_serializer = serializer + self.pickle_deserializer = deserializer def clone(self): """ @@ -72,6 +91,8 @@ cdef class SerializationContext: result.whitelisted_types = self.whitelisted_types.copy() result.custom_serializers = self.custom_serializers.copy() result.custom_deserializers = self.custom_deserializers.copy() + result.pickle_serializer = self.pickle_serializer + result.pickle_deserializer = self.pickle_deserializer return result @@ -119,7 +140,8 @@ cdef class SerializationContext: # use the closest match to type(obj) type_id = self.type_to_type_id[type_] if type_id in self.types_to_pickle: - serialized_obj = {"data": pickle.dumps(obj), "pickle": True} + serialized_obj = {"data": self.pickle_serializer(obj), + "pickle": True} elif type_id in self.custom_serializers: serialized_obj = {"data": self.custom_serializers[type_id](obj)} else: @@ -139,7 +161,7 @@ cdef class SerializationContext: if "pickle" in serialized_obj: # The object was pickled, so unpickle it. - obj = pickle.loads(serialized_obj["data"]) + obj = self.pickle_deserializer(serialized_obj["data"]) else: assert type_id not in self.types_to_pickle if type_id not in self.whitelisted_types: diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index 6116556386b1a..e4681e3a59751 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -555,3 +555,42 @@ def test_deserialize_buffer_in_different_process(): dir_path = os.path.dirname(os.path.realpath(__file__)) python_file = os.path.join(dir_path, 'deserialize_buffer.py') subprocess.check_call(['python', python_file, f.name]) + + +def test_set_pickle(): + # Use a custom type to trigger pickling. + class Foo(object): + pass + + context = pa.SerializationContext() + context.register_type(Foo, 'Foo', pickle=True) + + test_object = Foo() + + # Define a custom serializer and deserializer to use in place of pickle. + + def dumps1(obj): + return b'custom' + + def loads1(serialized_obj): + return serialized_obj + b' serialization 1' + + # Test that setting a custom pickler changes the behavior. + context.set_pickle(dumps1, loads1) + serialized = pa.serialize(test_object, context=context).to_buffer() + deserialized = pa.deserialize(serialized.to_pybytes(), context=context) + assert deserialized == b'custom serialization 1' + + # Define another custom serializer and deserializer. + + def dumps2(obj): + return b'custom' + + def loads2(serialized_obj): + return serialized_obj + b' serialization 2' + + # Test that setting another custom pickler changes the behavior again. + context.set_pickle(dumps2, loads2) + serialized = pa.serialize(test_object, context=context).to_buffer() + deserialized = pa.deserialize(serialized.to_pybytes(), context=context) + assert deserialized == b'custom serialization 2' diff --git a/site/_data/versions.yml b/site/_data/versions.yml new file mode 100644 index 0000000000000..0d04183868dcf --- /dev/null +++ b/site/_data/versions.yml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Database of contributors to Apache Arrow (WIP) +# Blogs and other pages use this data +# +current: + number: '0.8.0' + date: '18 December 2017' + git-tag: '1d689e5' + github-tag-link: 'https://github.com/apache/arrow/releases/tag/apache-arrow-0.8.0' + release-notes: 'http://arrow.apache.org/release/0.8.0.html' + mirrors: 'https://www.apache.org/dyn/closer.cgi/arrow/arrow-0.8.0/' + mirrors-tar: 'https://www.apache.org/dyn/closer.cgi/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz' + java-artifacts: 'http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.arrow%22%20AND%20v%3A%220.8.0%22' + asc: 'https://www.apache.org/dist/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz.asc' + sha512: 'https://www.apache.org/dist/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz.sha512' \ No newline at end of file diff --git a/site/img/copy.png b/site/img/copy.png index a1e04999eb3fd..55ff71ece1e59 100644 Binary files a/site/img/copy.png and b/site/img/copy.png differ diff --git a/site/img/copy2.png b/site/img/copy2.png deleted file mode 100644 index 7869daddefe9f..0000000000000 Binary files a/site/img/copy2.png and /dev/null differ diff --git a/site/img/shared.png b/site/img/shared.png index 7869daddefe9f..b079ad0c6b4e5 100644 Binary files a/site/img/shared.png and b/site/img/shared.png differ diff --git a/site/img/shared2.png b/site/img/shared2.png deleted file mode 100644 index a1e04999eb3fd..0000000000000 Binary files a/site/img/shared2.png and /dev/null differ diff --git a/site/index.html b/site/index.html index 87995cbabed48..ec80075c59200 100644 --- a/site/index.html +++ b/site/index.html @@ -1,74 +1,78 @@ --- layout: default --- -
-
+
+

Apache Arrow

-

Powering Columnar In-Memory Analytics

+

A cross-language development platform for in-memory data

- Join Mailing List - Install (0.8.0 Release - December 18, 2017) + Join Mailing List + Install ({{site.data.versions['current'].number}} Release - {{site.data.versions['current'].date}})

-
-

See Latest News

-
+
+
+
+

+ See Latest News +

+
+
+
+
+

Apache Arrow is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. It also provides computational libraries and zero-copy streaming messaging and interprocess communication. Languages currently supported include C, C++, Java, JavaScript, Python, and Ruby.

+
+
+
-

Fast

-

Apache Arrow™ enables execution engines to take advantage of - the latest SIMD (Single input multiple data) operations included in modern - processors, for native vectorized optimization of analytical data - processing. Columnar layout is optimized for data locality for better - performance on modern hardware like CPUs and GPUs.

- -

The Arrow memory format supports zero-copy reads - for lightning-fast data access without serialization overhead.

- +

Fast

+

Apache Arrow™ enables execution engines to take advantage of the latest SIMD (Single input multiple data) operations included in modern processors, for native vectorized optimization of analytical data processing. Columnar layout is optimized for data locality for better performance on modern hardware like CPUs and GPUs.

+

The Arrow memory format supports zero-copy reads for lightning-fast data access without serialization overhead.

-

Flexible

-

Arrow acts as a new high-performance interface between various - systems. It is also focused on supporting a wide variety of - industry-standard programming languages. Java, C, C++, Python, Ruby, - and JavaScript implementations are in progress and more languages are - welcome.

+

Flexible

+

Arrow acts as a new high-performance interface between various systems. It is also focused on supporting a wide variety of industry-standard programming languages. Java, C, C++, Python, Ruby, and JavaScript implementations are in progress and more languages are welcome. +

-

Standard

-

Apache Arrow is backed by key developers of 13 major open source - projects, including Calcite, Cassandra, Drill, Hadoop, HBase, Ibis, - Impala, Kudu, Pandas, Parquet, Phoenix, Spark, and Storm making it - the de-facto standard for columnar in-memory analytics.

- -

Learn more about projects that are Powered By Apache Arrow

+

Standard

+

Apache Arrow is backed by key developers of 13 major open source projects, including Calcite, Cassandra, Drill, Hadoop, HBase, Ibis, Impala, Kudu, Pandas, Parquet, Phoenix, Spark, and Storm making it the de-facto standard for columnar in-memory analytics.

+

Learn more about projects that are Powered By Apache Arrow

+
+
+ +
+
+

Performance Advantage of Columnar In-Memory

+
+
+ SIMD
-
+
-

Performance Advantage of Columnar In-Memory

-
- SIMD +
+
+

Advantages of a Common Data Layer

+
+
+ common data layer +
    +
  • Each system has its own internal memory format
  • +
  • 70-80% computation wasted on serialization and deserialization
  • +
  • Similar functionality implemented in multiple projects
  • +
+
+
+ common data layer +
    +
  • All systems utilize the same memory format
  • +
  • No overhead for cross-system communication
  • +
  • Projects can share functionality (eg, Parquet-to-Arrow reader)
  • +
+
+
-

Advantages of a Common Data Layer

+ -
-
-common data layer -
    -
  • Each system has its own internal memory format
  • -
  • 70-80% computation wasted on serialization and deserialization
  • -
  • Similar functionality implemented in multiple projects
  • -
-
-
-common data layer -
    -
  • All systems utilize the same memory format
  • -
  • No overhead for cross-system communication
  • -
  • Projects can share functionality (eg, Parquet-to-Arrow reader)
  • -
-
-
-
- + diff --git a/site/install.md b/site/install.md index ec30e0469cdc1..f795299676eb5 100644 --- a/site/install.md +++ b/site/install.md @@ -20,9 +20,9 @@ limitations under the License. {% endcomment %} --> -## Current Version: 0.8.0 +## Current Version: {{site.data.versions['current'].number}} -### Released: 18 December 2017 +### Released: {{site.data.versions['current'].date}} See the [release notes][10] for more about what's new. @@ -30,7 +30,7 @@ See the [release notes][10] for more about what's new. * **Source Release**: [apache-arrow-0.8.0.tar.gz][6] * **Verification**: [sha512][3], [asc][7] ([verification instructions][12]) -* [Git tag 1d689e5][2] +* [Git tag {{site.data.versions['current'].git-tag}}][2] * [PGP keys for release signatures][11] ### Java Packages @@ -145,15 +145,15 @@ These repositories are managed at [red-data-tools/arrow-packages][9]. If you have any feedback, please send it to the project instead of Apache Arrow project. -[1]: https://www.apache.org/dyn/closer.cgi/arrow/arrow-0.8.0/ -[2]: https://github.com/apache/arrow/releases/tag/apache-arrow-0.8.0 -[3]: https://www.apache.org/dist/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz.sha512 -[4]: http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.arrow%22%20AND%20v%3A%220.8.0%22 +[1]: {{site.data.versions['current'].mirrors}} +[2]: {{site.data.versions['current'].github-tag-link}} +[3]: {{site.data.versions['current'].sha512}} +[4]: {{site.data.versions['current'].java-artifacts}} [5]: http://conda-forge.github.io -[6]: https://www.apache.org/dyn/closer.cgi/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz -[7]: https://www.apache.org/dist/arrow/arrow-0.8.0/apache-arrow-0.8.0.tar.gz.asc +[6]: {{site.data.versions['current'].mirrors-tar}} +[7]: {{site.data.versions['current'].asc}} [8]: https://github.com/red-data-tools/parquet-glib [9]: https://github.com/red-data-tools/arrow-packages -[10]: http://arrow.apache.org/release/0.8.0.html +[10]: {{site.data.versions['current'].release-notes}} [11]: http://www.apache.org/dist/arrow/KEYS [12]: https://www.apache.org/dyn/closer.cgi#verify \ No newline at end of file