diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index f35af707254db..f08901ec2016a 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -18,6 +18,8 @@ # Headers: top level install(FILES parquet.h + reader.h + exception.h DESTINATION include/parquet) ADD_PARQUET_TEST(reader-test) diff --git a/cpp/src/parquet/exception.h b/cpp/src/parquet/exception.h new file mode 100644 index 0000000000000..7d9403184adba --- /dev/null +++ b/cpp/src/parquet/exception.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_EXCEPTION_H +#define PARQUET_EXCEPTION_H + +#include +#include +#include + +namespace parquet_cpp { + +class ParquetException : public std::exception { + public: + static void EofException() { throw ParquetException("Unexpected end of stream."); } + static void NYI(const std::string& msg) { + std::stringstream ss; + ss << "Not yet implemented: " << msg << "."; + throw ParquetException(ss.str()); + } + + explicit ParquetException(const char* msg) : msg_(msg) {} + explicit ParquetException(const std::string& msg) : msg_(msg) {} + explicit ParquetException(const char* msg, exception& e) : msg_(msg) {} + + virtual ~ParquetException() throw() {} + virtual const char* what() const throw() { return msg_.c_str(); } + + private: + std::string msg_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_EXCEPTION_H diff --git a/cpp/src/parquet/parquet.h b/cpp/src/parquet/parquet.h index 320f0036ef04a..a1af6b7cce26b 100644 --- a/cpp/src/parquet/parquet.h +++ b/cpp/src/parquet/parquet.h @@ -1,45 +1,36 @@ -// Copyright 2012 Cloudera Inc. +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 // -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. #ifndef PARQUET_PARQUET_H #define PARQUET_PARQUET_H #include -#include #include +#include #include #include #include #include -// Needed for thrift -#include - +#include "parquet/exception.h" #include "parquet/thrift/parquet_constants.h" #include "parquet/thrift/parquet_types.h" #include "parquet/util/rle-encoding.h" -// TCompactProtocol requires some #defines to work right. -#define SIGNED_RIGHT_SHIFT_IS 1 -#define ARITHMETIC_RIGHT_SHIFT 1 -#include -#include -#include - -#include -#include - namespace std { template <> @@ -61,26 +52,6 @@ struct ByteArray { const uint8_t* ptr; }; -class ParquetException : public std::exception { - public: - static void EofException() { throw ParquetException("Unexpected end of stream."); } - static void NYI(const std::string& msg) { - std::stringstream ss; - ss << "Not yet implemented: " << msg << "."; - throw ParquetException(ss.str()); - } - - explicit ParquetException(const char* msg) : msg_(msg) {} - explicit ParquetException(const std::string& msg) : msg_(msg) {} - explicit ParquetException(const char* msg, exception& e) : msg_(msg) {} - - virtual ~ParquetException() throw() {} - virtual const char* what() const throw() { return msg_.c_str(); } - - private: - std::string msg_; -}; - // Interface for the column reader to get the bytes. The interface is a stream // interface, meaning the bytes in order and once a byte is read, it does not // need to be read again. @@ -235,27 +206,6 @@ inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* re return *def_level == 0; } -// Deserialize a thrift message from buf/len. buf/len must at least contain -// all the bytes needed to store the thrift message. On return, len will be -// set to the actual length of the header. -template -inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { - // Deserialize msg bytes into c++ thrift msg using memory transport. - boost::shared_ptr tmem_transport( - new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); - apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> tproto_factory; - boost::shared_ptr tproto = - tproto_factory.getProtocol(tmem_transport); - try { - deserialized_msg->read(tproto.get()); - } catch (apache::thrift::protocol::TProtocolException& e) { - throw ParquetException("Couldn't deserialize thrift.", e); - } - uint32_t bytes_left = tmem_transport->available_read(); - *len = *len - bytes_left; -} - } // namespace parquet_cpp #endif diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index f6bf8b1c9bde1..0f06f3fac7d12 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -15,12 +15,45 @@ // specific language governing permissions and limitations // under the License. +#include +#include +#include +#include + #include -namespace parquet { +#include "parquet/reader.h" + +using std::string; + +namespace parquet_cpp { + +const char* data_dir = std::getenv("PARQUET_TEST_DATA"); + + +class TestAllTypesPlain : public ::testing::Test { + public: + void SetUp() { + std::string dir_string(data_dir); + + std::stringstream ss; + ss << dir_string << "/" << "alltypes_plain.parquet"; + file_.Open(ss.str()); + reader_.Open(&file_); + } + + void TearDown() { + reader_.Close(); + } + + protected: + LocalFile file_; + ParquetFileReader reader_; +}; + -TEST(TestReader, ItWorks) { - ASSERT_TRUE(true); +TEST_F(TestAllTypesPlain, ParseMetaData) { + reader_.ParseMetaData(); } -} // namespace parquet +} // namespace parquet_cpp diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc new file mode 100644 index 0000000000000..7ccd98ca4dafc --- /dev/null +++ b/cpp/src/parquet/reader.cc @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/reader.h" + +#include +#include + +#include "parquet/exception.h" +#include "parquet/thrift/util.h" + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// LocalFile methods + +LocalFile::~LocalFile() { + // You must explicitly call Close +} + +void LocalFile::Open(const std::string& path) { + path_ = path; + file_ = fopen(path_.c_str(), "r"); + is_open_ = true; +} + +void LocalFile::Close() { + if (is_open_) { + fclose(file_); + is_open_ = false; + } +} + +size_t LocalFile::Size() { + fseek(file_, 0L, SEEK_END); + return Tell(); +} + +void LocalFile::Seek(size_t pos) { + fseek(file_, pos, SEEK_SET); +} + +size_t LocalFile::Tell() { + return ftell(file_); +} + +void LocalFile::Read(size_t nbytes, uint8_t* buffer, + size_t* bytes_read) { + *bytes_read = fread(buffer, 1, nbytes, file_); +} + +// ---------------------------------------------------------------------- +// ParquetFileReader + +// 4 byte constant + 4 byte metadata len +static constexpr uint32_t FOOTER_SIZE = 8; +static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; + +void ParquetFileReader::Open(FileLike* buffer) { + buffer_ = buffer; +} + +void ParquetFileReader::Close() { + buffer_->Close(); +} + +void ParquetFileReader::ParseMetaData() { + size_t filesize = buffer_->Size(); + + if (filesize < FOOTER_SIZE) { + throw ParquetException("Corrupted file, smaller than file footer"); + } + + size_t bytes_read; + uint8_t footer_buffer[FOOTER_SIZE]; + + buffer_->Seek(filesize - FOOTER_SIZE); + buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read); + + if (bytes_read != FOOTER_SIZE) { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } + if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } + + uint32_t metadata_len = *reinterpret_cast(footer_buffer); + size_t metadata_start = filesize - FOOTER_SIZE - metadata_len; + if (metadata_start < 0) { + throw ParquetException("Invalid parquet file. File is less than file metadata size."); + } + + buffer_->Seek(metadata_start); + + std::vector metadata_buffer(metadata_len); + buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read); + if (bytes_read != metadata_len) { + throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + } + DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/reader.h b/cpp/src/parquet/reader.h new file mode 100644 index 0000000000000..4a40e0482bad0 --- /dev/null +++ b/cpp/src/parquet/reader.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_FILE_READER_H +#define PARQUET_FILE_READER_H + +#include +#include +#include + +#include "parquet/thrift/parquet_types.h" +#include "parquet/parquet.h" + +namespace parquet_cpp { + +class FileLike { + public: + virtual ~FileLike() {} + + virtual void Close() = 0; + virtual size_t Size() = 0; + virtual size_t Tell() = 0; + virtual void Seek(size_t pos) = 0; + virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read) = 0; +}; + + +class LocalFile : public FileLike { + public: + LocalFile() : file_(nullptr), is_open_(false) {} + virtual ~LocalFile(); + + void Open(const std::string& path); + + virtual void Close(); + virtual size_t Size(); + virtual size_t Tell(); + virtual void Seek(size_t pos); + virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read); + + bool is_open() const { return is_open_;} + const std::string& path() const { return path_;} + + private: + std::string path_; + FILE* file_; + bool is_open_; +}; + + +class ParquetFileReader { + public: + ParquetFileReader() : buffer_(nullptr) {} + ~ParquetFileReader() {} + + // The class takes ownership of the passed file-like object + void Open(FileLike* buffer); + + void Close(); + + void ParseMetaData(); + + const parquet::FileMetaData& metadata() const { + return metadata_; + } + + private: + parquet::FileMetaData metadata_; + FileLike* buffer_; +}; + + +} // namespace parquet_cpp + +#endif // PARQUET_FILE_READER_H diff --git a/cpp/src/parquet/thrift/CMakeLists.txt b/cpp/src/parquet/thrift/CMakeLists.txt index e2a00c9e1c6b0..01e685e391a9c 100644 --- a/cpp/src/parquet/thrift/CMakeLists.txt +++ b/cpp/src/parquet/thrift/CMakeLists.txt @@ -26,4 +26,5 @@ set_target_properties(parquet_thrift install(FILES parquet_types.h parquet_constants.h + util.h DESTINATION include/parquet/thrift) diff --git a/cpp/src/parquet/thrift/util.h b/cpp/src/parquet/thrift/util.h new file mode 100644 index 0000000000000..ecf24c65cd00f --- /dev/null +++ b/cpp/src/parquet/thrift/util.h @@ -0,0 +1,46 @@ +#ifndef PARQUET_THRIFT_UTIL_H +#define PARQUET_THRIFT_UTIL_H + +#include + +// Needed for thrift +#include + +// TCompactProtocol requires some #defines to work right. +#define SIGNED_RIGHT_SHIFT_IS 1 +#define ARITHMETIC_RIGHT_SHIFT 1 +#include +#include +#include + +#include +#include + +#include "parquet/exception.h" + +namespace parquet_cpp { + +// Deserialize a thrift message from buf/len. buf/len must at least contain +// all the bytes needed to store the thrift message. On return, len will be +// set to the actual length of the header. +template +inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { + // Deserialize msg bytes into c++ thrift msg using memory transport. + boost::shared_ptr tmem_transport( + new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); + apache::thrift::protocol::TCompactProtocolFactoryT< + apache::thrift::transport::TMemoryBuffer> tproto_factory; + boost::shared_ptr tproto = + tproto_factory.getProtocol(tmem_transport); + try { + deserialized_msg->read(tproto.get()); + } catch (apache::thrift::protocol::TProtocolException& e) { + throw ParquetException("Couldn't deserialize thrift.", e); + } + uint32_t bytes_left = tmem_transport->available_read(); + *len = *len - bytes_left; +} + +} // namespace parquet_cpp + +#endif // PARQUET_THRIFT_UTIL_H diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index 1b712f715f6a3..766214bcdaae5 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -28,14 +28,14 @@ add_library(parquet_test_main if (APPLE) target_link_libraries(parquet_test_main - gtest - dl) + gtest + dl) set_target_properties(parquet_test_main - PROPERTIES LINK_FLAGS "-undefined dynamic_lookup") + PROPERTIES LINK_FLAGS "-undefined dynamic_lookup") else() target_link_libraries(parquet_test_main dl - gtest + gtest pthread ) endif()