forked from apache/arrow
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PARQUET-712: Add library to read into Arrow memory
At the moment this is just move of the existing code into the state where it compiles. Outstanding work includes: - [x] Understand the issues with ParquetException typeid matching in the tests *on macOS*. @wesm We already had this problem and you fixed it somewhere. Do you remember the solution? - [x] Understand why BoolenType tests break with a Thrift exception - [ ] Add functions that read directly into Arrow memory and not intermediate structures. Author: Uwe L. Korn <uwelk@xhochy.com> Author: Korn, Uwe <Uwe.Korn@blue-yonder.com> Closes apache#158 from xhochy/PARQUET-712 and squashes the following commits: e55ab1f [Uwe L. Korn] verbose ctest output 62f0f88 [Uwe L. Korn] Add static linkage fc2c316 [Uwe L. Korn] Style fixes 3f3e24b [Uwe L. Korn] Fix templating problem 45de044 [Uwe L. Korn] Style fixes for IO 1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it 251262a [Uwe L. Korn] Style fixes e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis 874b33d [Korn, Uwe] Add boost libraries for Arrow 142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory Change-Id: I893a02104d7f561fab24cf2fab2ba66dd61b6fd1
- Loading branch information
Showing
14 changed files
with
2,903 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
# ---------------------------------------------------------------------- | ||
# parquet_arrow : Arrow <-> Parquet adapter | ||
|
||
set(PARQUET_ARROW_SRCS | ||
io.cc | ||
reader.cc | ||
schema.cc | ||
writer.cc | ||
) | ||
|
||
add_library(parquet_arrow_objlib OBJECT | ||
${PARQUET_ARROW_SRCS} | ||
) | ||
|
||
# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX) | ||
|
||
if (PARQUET_BUILD_SHARED) | ||
add_library(parquet_arrow_shared SHARED $<TARGET_OBJECTS:parquet_arrow_objlib>) | ||
set_target_properties(parquet_arrow_shared | ||
PROPERTIES | ||
LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" | ||
LINK_FLAGS "${SHARED_LINK_FLAGS}" | ||
OUTPUT_NAME "parquet_arrow") | ||
target_link_libraries(parquet_arrow_shared | ||
arrow | ||
arrow_io | ||
parquet_shared) | ||
if (APPLE) | ||
set_target_properties(parquet_arrow_shared | ||
PROPERTIES | ||
BUILD_WITH_INSTALL_RPATH ON | ||
INSTALL_NAME_DIR "@rpath") | ||
endif() | ||
endif() | ||
|
||
if (PARQUET_BUILD_STATIC) | ||
add_library(parquet_arrow_static STATIC $<TARGET_OBJECTS:parquet_arrow_objlib>) | ||
set_target_properties(parquet_arrow_static | ||
PROPERTIES | ||
LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}" | ||
OUTPUT_NAME "parquet_arrow") | ||
target_link_libraries(parquet_arrow_static | ||
arrow_static | ||
arrow_static | ||
parquet_static) | ||
install(TARGETS parquet_arrow_static | ||
ARCHIVE DESTINATION lib | ||
LIBRARY DESTINATION lib) | ||
endif() | ||
|
||
ADD_PARQUET_TEST(arrow-schema-test) | ||
ADD_PARQUET_TEST(arrow-io-test) | ||
ADD_PARQUET_TEST(arrow-reader-writer-test) | ||
|
||
if (PARQUET_BUILD_STATIC) | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static) | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static) | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static) | ||
else() | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared) | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared) | ||
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared) | ||
endif() | ||
|
||
# Headers: top level | ||
install(FILES | ||
io.h | ||
reader.h | ||
schema.h | ||
utils.h | ||
writer.h | ||
DESTINATION include/parquet/arrow) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include <cstdint> | ||
#include <cstdlib> | ||
#include <memory> | ||
#include <string> | ||
|
||
#include "gtest/gtest.h" | ||
|
||
#include "arrow/test-util.h" | ||
#include "arrow/util/memory-pool.h" | ||
#include "arrow/util/status.h" | ||
|
||
#include "parquet/api/io.h" | ||
#include "parquet/arrow/io.h" | ||
|
||
using arrow::default_memory_pool; | ||
using arrow::MemoryPool; | ||
using arrow::Status; | ||
|
||
// To assist with readability | ||
using ArrowROFile = arrow::io::RandomAccessFile; | ||
|
||
namespace parquet { | ||
namespace arrow { | ||
|
||
// Allocator tests | ||
|
||
TEST(TestParquetAllocator, DefaultCtor) { | ||
ParquetAllocator allocator; | ||
|
||
const int buffer_size = 10; | ||
|
||
uint8_t* buffer = nullptr; | ||
ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); | ||
|
||
// valgrind will complain if we write into nullptr | ||
memset(buffer, 0, buffer_size); | ||
|
||
allocator.Free(buffer, buffer_size); | ||
} | ||
|
||
// Pass through to the default memory pool | ||
class TrackingPool : public MemoryPool { | ||
public: | ||
TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {} | ||
|
||
Status Allocate(int64_t size, uint8_t** out) override { | ||
RETURN_NOT_OK(pool_->Allocate(size, out)); | ||
bytes_allocated_ += size; | ||
return Status::OK(); | ||
} | ||
|
||
void Free(uint8_t* buffer, int64_t size) override { | ||
pool_->Free(buffer, size); | ||
bytes_allocated_ -= size; | ||
} | ||
|
||
int64_t bytes_allocated() const override { return bytes_allocated_; } | ||
|
||
private: | ||
MemoryPool* pool_; | ||
int64_t bytes_allocated_; | ||
}; | ||
|
||
TEST(TestParquetAllocator, CustomPool) { | ||
TrackingPool pool; | ||
|
||
ParquetAllocator allocator(&pool); | ||
|
||
ASSERT_EQ(&pool, allocator.pool()); | ||
|
||
const int buffer_size = 10; | ||
|
||
uint8_t* buffer = nullptr; | ||
ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size);); | ||
|
||
ASSERT_EQ(buffer_size, pool.bytes_allocated()); | ||
|
||
// valgrind will complain if we write into nullptr | ||
memset(buffer, 0, buffer_size); | ||
|
||
allocator.Free(buffer, buffer_size); | ||
|
||
ASSERT_EQ(0, pool.bytes_allocated()); | ||
} | ||
|
||
// ---------------------------------------------------------------------- | ||
// Read source tests | ||
|
||
class BufferReader : public ArrowROFile { | ||
public: | ||
BufferReader(const uint8_t* buffer, int buffer_size) | ||
: buffer_(buffer), buffer_size_(buffer_size), position_(0) {} | ||
|
||
Status Close() override { | ||
// no-op | ||
return Status::OK(); | ||
} | ||
|
||
Status Tell(int64_t* position) override { | ||
*position = position_; | ||
return Status::OK(); | ||
} | ||
|
||
Status ReadAt( | ||
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { | ||
RETURN_NOT_OK(Seek(position)); | ||
return Read(nbytes, bytes_read, buffer); | ||
} | ||
|
||
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override { | ||
memcpy(buffer, buffer_ + position_, nbytes); | ||
*bytes_read = std::min(nbytes, buffer_size_ - position_); | ||
position_ += *bytes_read; | ||
return Status::OK(); | ||
} | ||
|
||
Status GetSize(int64_t* size) override { | ||
*size = buffer_size_; | ||
return Status::OK(); | ||
} | ||
|
||
Status Seek(int64_t position) override { | ||
if (position < 0 || position >= buffer_size_) { | ||
return Status::IOError("position out of bounds"); | ||
} | ||
|
||
position_ = position; | ||
return Status::OK(); | ||
} | ||
|
||
private: | ||
const uint8_t* buffer_; | ||
int buffer_size_; | ||
int64_t position_; | ||
}; | ||
|
||
TEST(TestParquetReadSource, Basics) { | ||
std::string data = "this is the data"; | ||
auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str()); | ||
|
||
ParquetAllocator allocator(default_memory_pool()); | ||
|
||
auto file = std::make_shared<BufferReader>(data_buffer, data.size()); | ||
auto source = std::make_shared<ParquetReadSource>(&allocator); | ||
|
||
ASSERT_OK(source->Open(file)); | ||
|
||
ASSERT_EQ(0, source->Tell()); | ||
ASSERT_NO_THROW(source->Seek(5)); | ||
ASSERT_EQ(5, source->Tell()); | ||
ASSERT_NO_THROW(source->Seek(0)); | ||
|
||
// Seek out of bounds | ||
ASSERT_THROW(source->Seek(100), ParquetException); | ||
|
||
uint8_t buffer[50]; | ||
|
||
ASSERT_NO_THROW(source->Read(4, buffer)); | ||
ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); | ||
ASSERT_EQ(4, source->Tell()); | ||
|
||
std::shared_ptr<Buffer> pq_buffer; | ||
|
||
ASSERT_NO_THROW(pq_buffer = source->Read(7)); | ||
|
||
auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7); | ||
|
||
ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get())); | ||
} | ||
|
||
} // namespace arrow | ||
} // namespace parquet |
Oops, something went wrong.