Skip to content

Commit

Permalink
PARQUET-712: Add library to read into Arrow memory
Browse files Browse the repository at this point in the history
At the moment this is just move of the existing code into the state where it compiles. Outstanding work includes:

- [x] Understand the issues with ParquetException typeid matching in the tests *on macOS*. @wesm We already had this problem and you fixed it somewhere. Do you remember the solution?
- [x] Understand why BoolenType tests break with a Thrift exception
- [ ] Add functions that read directly into Arrow memory and not intermediate structures.

Author: Uwe L. Korn <uwelk@xhochy.com>
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes apache#158 from xhochy/PARQUET-712 and squashes the following commits:

e55ab1f [Uwe L. Korn] verbose ctest output
62f0f88 [Uwe L. Korn] Add static linkage
fc2c316 [Uwe L. Korn] Style fixes
3f3e24b [Uwe L. Korn] Fix templating problem
45de044 [Uwe L. Korn] Style fixes for IO
1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it
251262a [Uwe L. Korn] Style fixes
e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis
874b33d [Korn, Uwe] Add boost libraries for Arrow
142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory

Change-Id: I893a02104d7f561fab24cf2fab2ba66dd61b6fd1
  • Loading branch information
xhochy authored and wesm committed Sep 18, 2016
1 parent e72310a commit cf93990
Show file tree
Hide file tree
Showing 14 changed files with 2,903 additions and 0 deletions.
90 changes: 90 additions & 0 deletions cpp/src/parquet/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# ----------------------------------------------------------------------
# parquet_arrow : Arrow <-> Parquet adapter

set(PARQUET_ARROW_SRCS
io.cc
reader.cc
schema.cc
writer.cc
)

add_library(parquet_arrow_objlib OBJECT
${PARQUET_ARROW_SRCS}
)

# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)

if (PARQUET_BUILD_SHARED)
add_library(parquet_arrow_shared SHARED $<TARGET_OBJECTS:parquet_arrow_objlib>)
set_target_properties(parquet_arrow_shared
PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
LINK_FLAGS "${SHARED_LINK_FLAGS}"
OUTPUT_NAME "parquet_arrow")
target_link_libraries(parquet_arrow_shared
arrow
arrow_io
parquet_shared)
if (APPLE)
set_target_properties(parquet_arrow_shared
PROPERTIES
BUILD_WITH_INSTALL_RPATH ON
INSTALL_NAME_DIR "@rpath")
endif()
endif()

if (PARQUET_BUILD_STATIC)
add_library(parquet_arrow_static STATIC $<TARGET_OBJECTS:parquet_arrow_objlib>)
set_target_properties(parquet_arrow_static
PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
OUTPUT_NAME "parquet_arrow")
target_link_libraries(parquet_arrow_static
arrow_static
arrow_static
parquet_static)
install(TARGETS parquet_arrow_static
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib)
endif()

ADD_PARQUET_TEST(arrow-schema-test)
ADD_PARQUET_TEST(arrow-io-test)
ADD_PARQUET_TEST(arrow-reader-writer-test)

if (PARQUET_BUILD_STATIC)
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
else()
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
endif()

# Headers: top level
install(FILES
io.h
reader.h
schema.h
utils.h
writer.h
DESTINATION include/parquet/arrow)

189 changes: 189 additions & 0 deletions cpp/src/parquet/arrow/arrow-io-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>

#include "gtest/gtest.h"

#include "arrow/test-util.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

#include "parquet/api/io.h"
#include "parquet/arrow/io.h"

using arrow::default_memory_pool;
using arrow::MemoryPool;
using arrow::Status;

// To assist with readability
using ArrowROFile = arrow::io::RandomAccessFile;

namespace parquet {
namespace arrow {

// Allocator tests

TEST(TestParquetAllocator, DefaultCtor) {
ParquetAllocator allocator;

const int buffer_size = 10;

uint8_t* buffer = nullptr;
ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););

// valgrind will complain if we write into nullptr
memset(buffer, 0, buffer_size);

allocator.Free(buffer, buffer_size);
}

// Pass through to the default memory pool
class TrackingPool : public MemoryPool {
public:
TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}

Status Allocate(int64_t size, uint8_t** out) override {
RETURN_NOT_OK(pool_->Allocate(size, out));
bytes_allocated_ += size;
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size) override {
pool_->Free(buffer, size);
bytes_allocated_ -= size;
}

int64_t bytes_allocated() const override { return bytes_allocated_; }

private:
MemoryPool* pool_;
int64_t bytes_allocated_;
};

TEST(TestParquetAllocator, CustomPool) {
TrackingPool pool;

ParquetAllocator allocator(&pool);

ASSERT_EQ(&pool, allocator.pool());

const int buffer_size = 10;

uint8_t* buffer = nullptr;
ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););

ASSERT_EQ(buffer_size, pool.bytes_allocated());

// valgrind will complain if we write into nullptr
memset(buffer, 0, buffer_size);

allocator.Free(buffer, buffer_size);

ASSERT_EQ(0, pool.bytes_allocated());
}

// ----------------------------------------------------------------------
// Read source tests

class BufferReader : public ArrowROFile {
public:
BufferReader(const uint8_t* buffer, int buffer_size)
: buffer_(buffer), buffer_size_(buffer_size), position_(0) {}

Status Close() override {
// no-op
return Status::OK();
}

Status Tell(int64_t* position) override {
*position = position_;
return Status::OK();
}

Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, buffer);
}

Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
memcpy(buffer, buffer_ + position_, nbytes);
*bytes_read = std::min(nbytes, buffer_size_ - position_);
position_ += *bytes_read;
return Status::OK();
}

Status GetSize(int64_t* size) override {
*size = buffer_size_;
return Status::OK();
}

Status Seek(int64_t position) override {
if (position < 0 || position >= buffer_size_) {
return Status::IOError("position out of bounds");
}

position_ = position;
return Status::OK();
}

private:
const uint8_t* buffer_;
int buffer_size_;
int64_t position_;
};

TEST(TestParquetReadSource, Basics) {
std::string data = "this is the data";
auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());

ParquetAllocator allocator(default_memory_pool());

auto file = std::make_shared<BufferReader>(data_buffer, data.size());
auto source = std::make_shared<ParquetReadSource>(&allocator);

ASSERT_OK(source->Open(file));

ASSERT_EQ(0, source->Tell());
ASSERT_NO_THROW(source->Seek(5));
ASSERT_EQ(5, source->Tell());
ASSERT_NO_THROW(source->Seek(0));

// Seek out of bounds
ASSERT_THROW(source->Seek(100), ParquetException);

uint8_t buffer[50];

ASSERT_NO_THROW(source->Read(4, buffer));
ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
ASSERT_EQ(4, source->Tell());

std::shared_ptr<Buffer> pq_buffer;

ASSERT_NO_THROW(pq_buffer = source->Read(7));

auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);

ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
}

} // namespace arrow
} // namespace parquet
Loading

0 comments on commit cf93990

Please sign in to comment.