Skip to content

Commit

Permalink
PARQUET-434: Add a ParquetFileReader class
Browse files Browse the repository at this point in the history
Starting point for build out.

Author: Wes McKinney <wes@cloudera.com>

Closes apache#20 from wesm/parquet-file-reader and squashes the following commits:

8a9e875 [Wes McKinney] Add PARQUET_TEST_DATA to setup_build_env.sh and add to README
f7cd165 [Wes McKinney] Create ParquetFileReader class with ParseMetaData method and a basic unit test. Run valgrind and cpplint as part of Travis CI build

Change-Id: I4b0931b1f31c2de53703611be5281daaa25746a9
  • Loading branch information
wesm authored and nongli committed Jan 26, 2016
1 parent 6e45312 commit fea0443
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 74 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
# Headers: top level
install(FILES
parquet.h
reader.h
exception.h
DESTINATION include/parquet)

ADD_PARQUET_TEST(reader-test)
49 changes: 49 additions & 0 deletions cpp/src/parquet/exception.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_EXCEPTION_H
#define PARQUET_EXCEPTION_H

#include <exception>
#include <sstream>
#include <string>

namespace parquet_cpp {

class ParquetException : public std::exception {
public:
static void EofException() { throw ParquetException("Unexpected end of stream."); }
static void NYI(const std::string& msg) {
std::stringstream ss;
ss << "Not yet implemented: " << msg << ".";
throw ParquetException(ss.str());
}

explicit ParquetException(const char* msg) : msg_(msg) {}
explicit ParquetException(const std::string& msg) : msg_(msg) {}
explicit ParquetException(const char* msg, exception& e) : msg_(msg) {}

virtual ~ParquetException() throw() {}
virtual const char* what() const throw() { return msg_.c_str(); }

private:
std::string msg_;
};

} // namespace parquet_cpp

#endif // PARQUET_EXCEPTION_H
82 changes: 16 additions & 66 deletions cpp/src/parquet/parquet.h
Original file line number Diff line number Diff line change
@@ -1,45 +1,36 @@
// Copyright 2012 Cloudera Inc.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_PARQUET_H
#define PARQUET_PARQUET_H

#include <exception>
#include <sstream>
#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

// Needed for thrift
#include <boost/shared_ptr.hpp>

#include "parquet/exception.h"
#include "parquet/thrift/parquet_constants.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/util/rle-encoding.h"

// TCompactProtocol requires some #defines to work right.
#define SIGNED_RIGHT_SHIFT_IS 1
#define ARITHMETIC_RIGHT_SHIFT 1
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/TApplicationException.h>

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>

namespace std {

template <>
Expand All @@ -61,26 +52,6 @@ struct ByteArray {
const uint8_t* ptr;
};

class ParquetException : public std::exception {
public:
static void EofException() { throw ParquetException("Unexpected end of stream."); }
static void NYI(const std::string& msg) {
std::stringstream ss;
ss << "Not yet implemented: " << msg << ".";
throw ParquetException(ss.str());
}

explicit ParquetException(const char* msg) : msg_(msg) {}
explicit ParquetException(const std::string& msg) : msg_(msg) {}
explicit ParquetException(const char* msg, exception& e) : msg_(msg) {}

virtual ~ParquetException() throw() {}
virtual const char* what() const throw() { return msg_.c_str(); }

private:
std::string msg_;
};

// Interface for the column reader to get the bytes. The interface is a stream
// interface, meaning the bytes in order and once a byte is read, it does not
// need to be read again.
Expand Down Expand Up @@ -235,27 +206,6 @@ inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* re
return *def_level == 0;
}

// Deserialize a thrift message from buf/len. buf/len must at least contain
// all the bytes needed to store the thrift message. On return, len will be
// set to the actual length of the header.
template <class T>
inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) {
// Deserialize msg bytes into c++ thrift msg using memory transport.
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
apache::thrift::protocol::TCompactProtocolFactoryT<
apache::thrift::transport::TMemoryBuffer> tproto_factory;
boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
tproto_factory.getProtocol(tmem_transport);
try {
deserialized_msg->read(tproto.get());
} catch (apache::thrift::protocol::TProtocolException& e) {
throw ParquetException("Couldn't deserialize thrift.", e);
}
uint32_t bytes_left = tmem_transport->available_read();
*len = *len - bytes_left;
}

} // namespace parquet_cpp

#endif
41 changes: 37 additions & 4 deletions cpp/src/parquet/reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,45 @@
// specific language governing permissions and limitations
// under the License.

#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>

#include <gtest/gtest.h>

namespace parquet {
#include "parquet/reader.h"

using std::string;

namespace parquet_cpp {

const char* data_dir = std::getenv("PARQUET_TEST_DATA");


class TestAllTypesPlain : public ::testing::Test {
public:
void SetUp() {
std::string dir_string(data_dir);

std::stringstream ss;
ss << dir_string << "/" << "alltypes_plain.parquet";
file_.Open(ss.str());
reader_.Open(&file_);
}

void TearDown() {
reader_.Close();
}

protected:
LocalFile file_;
ParquetFileReader reader_;
};


TEST(TestReader, ItWorks) {
ASSERT_TRUE(true);
TEST_F(TestAllTypesPlain, ParseMetaData) {
reader_.ParseMetaData();
}

} // namespace parquet
} // namespace parquet_cpp
117 changes: 117 additions & 0 deletions cpp/src/parquet/reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/reader.h"

#include <cstdio>
#include <vector>

#include "parquet/exception.h"
#include "parquet/thrift/util.h"

namespace parquet_cpp {

// ----------------------------------------------------------------------
// LocalFile methods

LocalFile::~LocalFile() {
// You must explicitly call Close
}

void LocalFile::Open(const std::string& path) {
path_ = path;
file_ = fopen(path_.c_str(), "r");
is_open_ = true;
}

void LocalFile::Close() {
if (is_open_) {
fclose(file_);
is_open_ = false;
}
}

size_t LocalFile::Size() {
fseek(file_, 0L, SEEK_END);
return Tell();
}

void LocalFile::Seek(size_t pos) {
fseek(file_, pos, SEEK_SET);
}

size_t LocalFile::Tell() {
return ftell(file_);
}

void LocalFile::Read(size_t nbytes, uint8_t* buffer,
size_t* bytes_read) {
*bytes_read = fread(buffer, 1, nbytes, file_);
}

// ----------------------------------------------------------------------
// ParquetFileReader

// 4 byte constant + 4 byte metadata len
static constexpr uint32_t FOOTER_SIZE = 8;
static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};

void ParquetFileReader::Open(FileLike* buffer) {
buffer_ = buffer;
}

void ParquetFileReader::Close() {
buffer_->Close();
}

void ParquetFileReader::ParseMetaData() {
size_t filesize = buffer_->Size();

if (filesize < FOOTER_SIZE) {
throw ParquetException("Corrupted file, smaller than file footer");
}

size_t bytes_read;
uint8_t footer_buffer[FOOTER_SIZE];

buffer_->Seek(filesize - FOOTER_SIZE);
buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read);

if (bytes_read != FOOTER_SIZE) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
}
if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
}

uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
size_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
if (metadata_start < 0) {
throw ParquetException("Invalid parquet file. File is less than file metadata size.");
}

buffer_->Seek(metadata_start);

std::vector<uint8_t> metadata_buffer(metadata_len);
buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read);
if (bytes_read != metadata_len) {
throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
}
DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
}

} // namespace parquet_cpp
Loading

0 comments on commit fea0443

Please sign in to comment.